diff --git a/server/storage/backend/backend_mysql.go b/server/storage/backend/backend_mysql.go index 96f82dc20..38312ce7a 100644 --- a/server/storage/backend/backend_mysql.go +++ b/server/storage/backend/backend_mysql.go @@ -24,7 +24,7 @@ type mysqlBackend struct { func newMySQLBackend(bcfg BackendConfig) (*mysqlBackend, error) { db, err := sql.Open("mysql", bcfg.MySQLDSN) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to open MySQL connection: %w", err) } // Set connection pool settings @@ -42,7 +42,8 @@ func newMySQLBackend(bcfg BackendConfig) (*mysqlBackend, error) { // Initialize tables if err := be.initTables(); err != nil { - return nil, err + db.Close() + return nil, fmt.Errorf("failed to initialize tables: %w", err) } return be, nil @@ -58,7 +59,10 @@ func (m *mysqlBackend) initTables() error { version BIGINT ) `) - return err + if err != nil { + return fmt.Errorf("failed to create kv_store table: %w", err) + } + return nil } func (m *mysqlBackend) BatchTx() BatchTx { @@ -74,19 +78,22 @@ func (m *mysqlBackend) ConcurrentReadTx() ReadTx { } func (m *mysqlBackend) Snapshot() Snapshot { - return &mysqlSnapshot{be: m} + id := fmt.Sprintf("%d", time.Now().UnixNano()) + _, err := m.db.Exec(fmt.Sprintf("CREATE TABLE snapshot_%s AS SELECT * FROM kv_store", id)) + if err != nil { + m.lg.Error("failed to create snapshot", zap.Error(err)) + return nil + } + return &mysqlSnapshot{be: m, id: id} } func (m *mysqlBackend) Hash(ignores func([]byte, []byte) bool) (uint32, error) { - // Implement hash calculation for MySQL - // This is a placeholder implementation; you should replace it with your actual logic. return 0, fmt.Errorf("Hash not implemented for MySQL backend") } func (m *mysqlBackend) Size() int64 { var size int64 - row := m.db.QueryRow("SELECT SUM(DATA_LENGTH + INDEX_LENGTH) FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE()") - err := row.Scan(&size) + err := m.db.QueryRow("SELECT SUM(DATA_LENGTH + INDEX_LENGTH) FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE()").Scan(&size) if err != nil { m.lg.Error("failed to get database size", zap.Error(err)) return 0 @@ -99,46 +106,112 @@ func (m *mysqlBackend) SizeInUse() int64 { } func (m *mysqlBackend) OpenReadTxN() int64 { - // MySQL doesn't have a concept of read transactions, so return 0 - return 0 + return 0 // MySQL doesn't have a concept of read transactions } func (m *mysqlBackend) Defrag() error { - // MySQL handles fragmentation internally, so this is a no-op - return nil + m.lg.Info("Defrag called on MySQL backend (no-op)") + return nil // MySQL handles fragmentation internally } func (m *mysqlBackend) ForceCommit() { + m.lg.Info("ForceCommit called on MySQL backend (no-op)") // MySQL commits automatically, so this is a no-op } func (m *mysqlBackend) Close() error { close(m.stopc) <-m.donec - return m.db.Close() + if err := m.db.Close(); err != nil { + return fmt.Errorf("failed to close MySQL connection: %w", err) + } + return nil +} + +func (m *mysqlBackend) RestoreSnapshot(r io.Reader) error { + tx, err := m.db.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + if _, err = tx.Exec("DELETE FROM kv_store"); err != nil { + return fmt.Errorf("failed to clear kv_store: %w", err) + } + + stmt, err := tx.Prepare("INSERT INTO kv_store (key, value) VALUES (?, ?)") + if err != nil { + return fmt.Errorf("failed to prepare insert statement: %w", err) + } + defer stmt.Close() + + buf := make([]byte, 8192) + for { + n, err := r.Read(buf) + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to read from snapshot: %w", err) + } + key := buf[:n/2] + value := buf[n/2:n] + if _, err = stmt.Exec(key, value); err != nil { + return fmt.Errorf("failed to insert key-value pair: %w", err) + } + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + return nil +} + +func (m *mysqlBackend) Backup(w io.Writer) error { + rows, err := m.db.Query("SELECT key, value FROM kv_store") + if err != nil { + return fmt.Errorf("failed to query kv_store: %w", err) + } + defer rows.Close() + + for rows.Next() { + var key, value []byte + if err := rows.Scan(&key, &value); err != nil { + return fmt.Errorf("failed to scan row: %w", err) + } + if _, err := w.Write(key); err != nil { + return fmt.Errorf("failed to write key: %w", err) + } + if _, err := w.Write(value); err != nil { + return fmt.Errorf("failed to write value: %w", err) + } + } + if err := rows.Err(); err != nil { + return fmt.Errorf("error during row iteration: %w", err) + } + return nil +} + +func (m *mysqlBackend) RestoreBackup(r io.Reader) error { + return m.RestoreSnapshot(r) // The process is the same as restoring from a snapshot } func (m *mysqlBackend) SetTxPostLockInsideApplyHook(hook func()) { + m.mu.Lock() + defer m.mu.Unlock() m.postLockInsideApplyHook = hook } -// mysqlBatchTx implements BatchTx interface type mysqlBatchTx struct { be *mysqlBackend tx *sql.Tx } -func (t *mysqlBatchTx) Lock() { - t.be.mu.Lock() -} - -func (t *mysqlBatchTx) Unlock() { - t.be.mu.Unlock() -} +func (t *mysqlBatchTx) Lock() { t.be.mu.Lock() } +func (t *mysqlBatchTx) Unlock() { t.be.mu.Unlock() } func (t *mysqlBatchTx) UnsafeCreateBucket(bucket Bucket) { - // MySQL doesn't use buckets, so this is a no-op - t.be.lg.Warn("UnsafeCreateBucket called on MySQL backend", zap.String("bucket", "n/a")) + t.be.lg.Warn("UnsafeCreateBucket called on MySQL backend (no-op)", zap.String("bucket", bucket.String())) } func (t *mysqlBatchTx) UnsafePut(bucket Bucket, key []byte, value []byte) { @@ -152,7 +225,7 @@ func (t *mysqlBatchTx) UnsafePut(bucket Bucket, key []byte, value []byte) { } _, err := t.tx.Exec("INSERT INTO kv_store (key, value) VALUES (?, ?) ON DUPLICATE KEY UPDATE value = ?", key, value, value) if err != nil { - t.be.lg.Error("failed to put key-value pair", zap.Error(err)) + t.be.lg.Error("failed to put key-value pair", zap.Error(err), zap.Binary("key", key)) } } @@ -171,29 +244,28 @@ func (t *mysqlBatchTx) UnsafeDelete(bucket Bucket, key []byte) { } _, err := t.tx.Exec("DELETE FROM kv_store WHERE key = ?", key) if err != nil { - t.be.lg.Error("failed to delete key", zap.Error(err)) + t.be.lg.Error("failed to delete key", zap.Error(err), zap.Binary("key", key)) } } func (t *mysqlBatchTx) UnsafeDeleteBucket(bucket Bucket) { - t.be.lg.Warn("UnsafeDeleteBucket called on MySQL backend", zap.String("bucket", "n/a")) - // No-op for MySQL as it doesn't use buckets + t.be.lg.Warn("UnsafeDeleteBucket called on MySQL backend (no-op)", zap.String("bucket", bucket.String())) } func (t *mysqlBatchTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error { rows, err := t.be.db.Query("SELECT key, value FROM kv_store") if err != nil { - return err + return fmt.Errorf("failed to query kv_store: %w", err) } defer rows.Close() for rows.Next() { var k, v []byte if err := rows.Scan(&k, &v); err != nil { - return err + return fmt.Errorf("failed to scan row: %w", err) } if err := visitor(k, v); err != nil { - return err + return fmt.Errorf("visitor function failed: %w", err) } } return rows.Err() @@ -204,7 +276,7 @@ func (t *mysqlBatchTx) UnsafeRange(bucket Bucket, key, endKey []byte, limit int6 query := "SELECT key, value FROM kv_store WHERE key >= ? AND key < ? ORDER BY key LIMIT ?" rows, err := t.be.db.Query(query, key, endKey, limit) if err != nil { - t.be.lg.Error("failed to query range", zap.Error(err)) + t.be.lg.Error("failed to query range", zap.Error(err), zap.Binary("start", key), zap.Binary("end", endKey)) return nil, nil } defer rows.Close() @@ -217,6 +289,9 @@ func (t *mysqlBatchTx) UnsafeRange(bucket Bucket, key, endKey []byte, limit int6 keys = append(keys, k) values = append(values, v) } + if err := rows.Err(); err != nil { + t.be.lg.Error("error during row iteration", zap.Error(err)) + } return keys, values } @@ -232,7 +307,6 @@ func (t *mysqlBatchTx) Commit() { func (t *mysqlBatchTx) CommitAndStop() { t.Commit() - // Additional cleanup if needed } func (t *mysqlBatchTx) LockInsideApply() { @@ -246,7 +320,6 @@ func (t *mysqlBatchTx) LockOutsideApply() { t.be.mu.Lock() } -// mysqlReadTx implements ReadTx interface type mysqlReadTx struct { be *mysqlBackend } @@ -262,7 +335,7 @@ func (t *mysqlReadTx) UnsafeRange(bucket Bucket, key, endKey []byte, limit int64 query := "SELECT key, value FROM kv_store WHERE key >= ? AND key < ? ORDER BY key LIMIT ?" rows, err := t.be.db.Query(query, key, endKey, limit) if err != nil { - t.be.lg.Error("failed to query range", zap.Error(err)) + t.be.lg.Error("failed to query range", zap.Error(err), zap.Binary("start", key), zap.Binary("end", endKey)) return nil, nil } defer rows.Close() @@ -275,51 +348,80 @@ func (t *mysqlReadTx) UnsafeRange(bucket Bucket, key, endKey []byte, limit int64 keys = append(keys, k) values = append(values, v) } - return keys, values -} - -func (t *mysqlReadTx) UnsafeGet(bucket Bucket, key []byte) (value []byte, err error) { - err = t.be.db.QueryRow("SELECT value FROM kv_store WHERE key = ?", key).Scan(&value) - if err == sql.ErrNoRows { - return nil, nil + if err := rows.Err(); err != nil { + t.be.lg.Error("error during row iteration", zap.Error(err)) } - return + return keys, values } func (t *mysqlReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error { rows, err := t.be.db.Query("SELECT key, value FROM kv_store") if err != nil { - return err + return fmt.Errorf("failed to query kv_store: %w", err) } defer rows.Close() for rows.Next() { var k, v []byte if err := rows.Scan(&k, &v); err != nil { - return err + return fmt.Errorf("failed to scan row: %w", err) } if err := visitor(k, v); err != nil { - return err + return fmt.Errorf("visitor function failed: %w", err) } } return rows.Err() } -// mysqlSnapshot implements Snapshot interface type mysqlSnapshot struct { be *mysqlBackend + id string } func (s *mysqlSnapshot) Close() error { - // MySQL doesn't require explicit snapshot closing + _, err := s.be.db.Exec(fmt.Sprintf("DROP TABLE IF EXISTS snapshot_%s", s.id)) + if err != nil { + return fmt.Errorf("failed to drop snapshot table: %w", err) + } return nil } func (s *mysqlSnapshot) Size() int64 { - return s.be.Size() + var size int64 + err := s.be.db.QueryRow(fmt.Sprintf("SELECT SUM(LENGTH(key) + LENGTH(value)) FROM snapshot_%s", s.id)).Scan(&size) + if err != nil { + s.be.lg.Error("failed to get snapshot size", zap.Error(err)) + return 0 + } + return size } func (s *mysqlSnapshot) WriteTo(w io.Writer) (int64, error) { - // Implement snapshot writing logic - return 0, fmt.Errorf("WriteTo not implemented for MySQL snapshot") + rows, err := s.be.db.Query(fmt.Sprintf("SELECT key, value FROM snapshot_%s", s.id)) + if err != nil { + return 0, fmt.Errorf("failed to query snapshot: %w", err) + } + defer rows.Close() + + var total int64 + for rows.Next() { + var key, value []byte + if err := rows.Scan(&key, &value); err != nil { + return total, fmt.Errorf("failed to scan row: %w", err) + } + n, err := w.Write(key) + total += int64(n) + if err != nil { + return total, fmt.Errorf("failed to write key: %w", err) + } + n, err = w.Write(value) + total += int64(n) + if err != nil { + return total, fmt.Errorf("failed to write value: %w", err) + } + } + if err := rows.Err(); err != nil { + return total, fmt.Errorf("error during row iteration: %w", err) + } + return total, nil }