etcdserver: add clock to WAL struckt and altered some tests related to WAL records

wal test was changed due to the changes in WAL structure
in TestRepairWriteTearLast i changed number of expected entries from 40 to 29 because now one wal record takes morespace and therefore after truncating a wal file less records remain
Signed-off-by: Grigorii Kosov <kosov.gr@gmail.com>
Signed-off-by: KosovGrigorii <72564996+KosovGrigorii@users.noreply.github.com>
This commit is contained in:
KosovGrigorii 2024-01-12 14:56:40 +05:00
parent bdb249c7d5
commit 3dc1e4faa8
3 changed files with 27 additions and 13 deletions

View File

@ -125,7 +125,7 @@ func TestRepairWriteTearLast(t *testing.T) {
} }
return f.Truncate(offset) return f.Truncate(offset)
} }
testRepair(t, makeEnts(50), corruptf, 40) testRepair(t, makeEnts(50), corruptf, 29)
} }
// TestRepairWriteTearMiddle repairs the WAL when there is write tearing // TestRepairWriteTearMiddle repairs the WAL when there is write tearing

View File

@ -26,6 +26,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/jonboulle/clockwork"
"go.uber.org/zap" "go.uber.org/zap"
"go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/client/pkg/v3/fileutil"
@ -92,12 +94,14 @@ type WAL struct {
locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing) locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
fp *filePipeline fp *filePipeline
clock clockwork.Clock
} }
// Create creates a WAL ready for appending records. The given metadata is // Create creates a WAL ready for appending records. The given metadata is
// recorded at the head of each WAL file, and can be retrieved with ReadAll // recorded at the head of each WAL file, and can be retrieved with ReadAll
// after the file is Open. // after the file is Open.
func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) { func Create(lg *zap.Logger, dirpath string, metadata []byte, walClock ...clockwork.Clock) (*WAL, error) {
if Exist(dirpath) { if Exist(dirpath) {
return nil, os.ErrExist return nil, os.ErrExist
} }
@ -158,6 +162,12 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
dir: dirpath, dir: dirpath,
metadata: metadata, metadata: metadata,
} }
if walClock != nil {
w.clock = walClock[0]
} else {
w.clock = clockwork.NewRealClock()
}
w.encoder, err = newFileEncoder(f.File, 0) w.encoder, err = newFileEncoder(f.File, 0)
if err != nil { if err != nil {
return nil, err return nil, err
@ -166,7 +176,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
if err = w.saveCrc(0); err != nil { if err = w.saveCrc(0); err != nil {
return nil, err return nil, err
} }
if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: metadata}); err != nil { if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: metadata, CreatedAt: w.clock.Now().Unix()}); err != nil {
return nil, err return nil, err
} }
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil { if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
@ -354,6 +364,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
decoder: NewDecoder(rs...), decoder: NewDecoder(rs...),
readClose: closer, readClose: closer,
locks: ls, locks: ls,
clock: clockwork.NewRealClock(),
} }
if write { if write {
@ -748,7 +759,7 @@ func (w *WAL) cut() error {
return err return err
} }
if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: w.metadata}); err != nil { if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: w.metadata, CreatedAt: w.clock.Now().Unix()}); err != nil {
return err return err
} }
@ -905,7 +916,7 @@ func (w *WAL) Close() error {
func (w *WAL) saveEntry(e *raftpb.Entry) error { func (w *WAL) saveEntry(e *raftpb.Entry) error {
// TODO: add MustMarshalTo to reduce one allocation. // TODO: add MustMarshalTo to reduce one allocation.
b := pbutil.MustMarshal(e) b := pbutil.MustMarshal(e)
rec := &walpb.Record{Type: EntryType, Data: b} rec := &walpb.Record{Type: EntryType, Data: b, CreatedAt: w.clock.Now().Unix()}
if err := w.encoder.encode(rec); err != nil { if err := w.encoder.encode(rec); err != nil {
return err return err
} }
@ -919,7 +930,7 @@ func (w *WAL) saveState(s *raftpb.HardState) error {
} }
w.state = *s w.state = *s
b := pbutil.MustMarshal(s) b := pbutil.MustMarshal(s)
rec := &walpb.Record{Type: StateType, Data: b} rec := &walpb.Record{Type: StateType, Data: b, CreatedAt: w.clock.Now().Unix()}
return w.encoder.encode(rec) return w.encoder.encode(rec)
} }
@ -971,7 +982,7 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
w.mu.Lock() w.mu.Lock()
defer w.mu.Unlock() defer w.mu.Unlock()
rec := &walpb.Record{Type: SnapshotType, Data: b} rec := &walpb.Record{Type: SnapshotType, Data: b, CreatedAt: w.clock.Now().Unix()}
if err := w.encoder.encode(rec); err != nil { if err := w.encoder.encode(rec); err != nil {
return err return err
} }
@ -983,7 +994,7 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
} }
func (w *WAL) saveCrc(prevCrc uint32) error { func (w *WAL) saveCrc(prevCrc uint32) error {
return w.encoder.encode(&walpb.Record{Type: CrcType, Crc: prevCrc}) return w.encoder.encode(&walpb.Record{Type: CrcType, Crc: prevCrc, CreatedAt: w.clock.Now().Unix()})
} }
func (w *WAL) tail() *fileutil.LockedFile { func (w *WAL) tail() *fileutil.LockedFile {

View File

@ -29,6 +29,8 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
@ -49,7 +51,7 @@ var (
func TestNew(t *testing.T) { func TestNew(t *testing.T) {
p := t.TempDir() p := t.TempDir()
w, err := Create(zaptest.NewLogger(t), p, []byte("somedata")) w, err := Create(zaptest.NewLogger(t), p, []byte("somedata"), clockwork.NewFakeClock())
if err != nil { if err != nil {
t.Fatalf("err = %v, want nil", err) t.Fatalf("err = %v, want nil", err)
} }
@ -75,17 +77,18 @@ func TestNew(t *testing.T) {
var wb bytes.Buffer var wb bytes.Buffer
e := newEncoder(&wb, 0, 0) e := newEncoder(&wb, 0, 0)
err = e.encode(&walpb.Record{Type: CrcType, Crc: 0}) err = e.encode(&walpb.Record{Type: CrcType, Crc: 0, CreatedAt: w.clock.Now().Unix()})
if err != nil { if err != nil {
t.Fatalf("err = %v, want nil", err) t.Fatalf("err = %v, want nil", err)
} }
err = e.encode(&walpb.Record{Type: MetadataType, Data: []byte("somedata")}) err = e.encode(&walpb.Record{Type: MetadataType, Data: []byte("somedata"), CreatedAt: w.clock.Now().Unix()})
if err != nil { if err != nil {
t.Fatalf("err = %v, want nil", err) t.Fatalf("err = %v, want nil", err)
} }
r := &walpb.Record{ r := &walpb.Record{
Type: SnapshotType, Type: SnapshotType,
Data: pbutil.MustMarshal(&walpb.Snapshot{}), Data: pbutil.MustMarshal(&walpb.Snapshot{}),
CreatedAt: w.clock.Now().Unix(),
} }
if err = e.encode(r); err != nil { if err = e.encode(r); err != nil {
t.Fatalf("err = %v, want nil", err) t.Fatalf("err = %v, want nil", err)