server: Written Snapshot's to WAL contains populated ConfState.

This will (among others) allow etcd-3.6 to not depend on store_v2 .snap files at all,
as WAL + db file will be self-sufficient.
This commit is contained in:
Piotr Tabor 2021-02-28 14:40:01 +01:00
parent fce0c192eb
commit 4d4c84e014
7 changed files with 108 additions and 46 deletions

View File

@ -109,7 +109,7 @@ func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) {
log.Fatal(err)
}
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &snapshot.Metadata.ConfState
newss := snap.New(zap.NewExample(), destSnap)
if err = newss.SaveSnap(*snapshot); err != nil {
log.Fatal(err)

View File

@ -387,6 +387,8 @@ func (s *v3Manager) saveDB() error {
}
// saveWALAndSnap creates a WAL for the initial cluster
//
// TODO: This code ignores learners !!!
func (s *v3Manager) saveWALAndSnap() error {
if err := fileutil.CreateDirAll(s.walDir); err != nil {
return err
@ -454,19 +456,20 @@ func (s *v3Manager) saveWALAndSnap() error {
if berr != nil {
return berr
}
confState := raftpb.ConfState{
Voters: nodeIDs,
}
raftSnap := raftpb.Snapshot{
Data: b,
Metadata: raftpb.SnapshotMetadata{
Index: commit,
Term: term,
ConfState: raftpb.ConfState{
Voters: nodeIDs,
},
Index: commit,
Term: term,
ConfState: confState,
},
}
sn := snap.New(s.lg, s.snapDir)
if err := sn.SaveSnap(raftSnap); err != nil {
return err
}
return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term})
return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term, ConfState: &confState})
}

View File

@ -54,8 +54,9 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry.
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
walsnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
ConfState: &snap.Metadata.ConfState,
}
// save the snapshot file before writing the snapshot to the wal.
// This makes it possible for the snapshot file to become orphaned, but prevents

View File

@ -608,7 +608,6 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro
}
}
snaps = snaps[:n:n]
return snaps, nil
}
@ -944,6 +943,10 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
}
func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
if err := walpb.ValidateSnapshotForWrite(&e); err != nil {
return err
}
b := pbutil.MustMarshal(&e)
w.mu.Lock()

View File

@ -35,8 +35,15 @@ import (
"go.uber.org/zap"
)
var (
confState = raftpb.ConfState{
Voters: []uint64{0x00ffca74},
AutoLeave: false,
}
)
func TestNew(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -90,7 +97,7 @@ func TestNew(t *testing.T) {
}
func TestCreateFailFromPollutedDir(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -104,7 +111,7 @@ func TestCreateFailFromPollutedDir(t *testing.T) {
}
func TestWalCleanup(t *testing.T) {
testRoot, err := ioutil.TempDir(os.TempDir(), "waltestroot")
testRoot, err := ioutil.TempDir(t.TempDir(), "waltestroot")
if err != nil {
t.Fatal(err)
}
@ -135,7 +142,7 @@ func TestWalCleanup(t *testing.T) {
}
func TestCreateFailFromNoSpaceLeft(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -154,7 +161,7 @@ func TestCreateFailFromNoSpaceLeft(t *testing.T) {
}
func TestNewForInitedDir(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -167,7 +174,7 @@ func TestNewForInitedDir(t *testing.T) {
}
func TestOpenAtIndex(t *testing.T) {
dir, err := ioutil.TempDir(os.TempDir(), "waltest")
dir, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -210,7 +217,7 @@ func TestOpenAtIndex(t *testing.T) {
}
w.Close()
emptydir, err := ioutil.TempDir(os.TempDir(), "waltestempty")
emptydir, err := ioutil.TempDir(t.TempDir(), "waltestempty")
if err != nil {
t.Fatal(err)
}
@ -224,7 +231,7 @@ func TestOpenAtIndex(t *testing.T) {
// The test creates a WAL directory and cuts out multiple WAL files. Then
// it corrupts one of the files by completely truncating it.
func TestVerify(t *testing.T) {
walDir, err := ioutil.TempDir(os.TempDir(), "waltest")
walDir, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -273,7 +280,7 @@ func TestVerify(t *testing.T) {
// TODO: split it into smaller tests for better readability
func TestCut(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -304,7 +311,7 @@ func TestCut(t *testing.T) {
if err = w.cut(); err != nil {
t.Fatal(err)
}
snap := walpb.Snapshot{Index: 2, Term: 1}
snap := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState}
if err = w.SaveSnapshot(snap); err != nil {
t.Fatal(err)
}
@ -335,7 +342,7 @@ func TestCut(t *testing.T) {
}
func TestSaveWithCut(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -398,7 +405,7 @@ func TestSaveWithCut(t *testing.T) {
}
func TestRecover(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -513,7 +520,7 @@ func TestScanWalName(t *testing.T) {
}
func TestRecoverAfterCut(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -524,7 +531,7 @@ func TestRecoverAfterCut(t *testing.T) {
t.Fatal(err)
}
for i := 0; i < 10; i++ {
if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil {
if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i), Term: 1, ConfState: &confState}); err != nil {
t.Fatal(err)
}
es := []raftpb.Entry{{Index: uint64(i)}}
@ -542,7 +549,7 @@ func TestRecoverAfterCut(t *testing.T) {
}
for i := 0; i < 10; i++ {
w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i)})
w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i), Term: 1})
if err != nil {
if i <= 4 {
if err != ErrFileNotFound {
@ -571,7 +578,7 @@ func TestRecoverAfterCut(t *testing.T) {
}
func TestOpenAtUncommittedIndex(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -605,7 +612,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
// it releases the lock of part of data, and excepts that OpenForRead
// can read out all files even if some are locked for write.
func TestOpenForRead(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -646,7 +653,7 @@ func TestOpenForRead(t *testing.T) {
}
func TestOpenWithMaxIndex(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -689,7 +696,7 @@ func TestSaveEmpty(t *testing.T) {
}
func TestReleaseLockTo(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -761,7 +768,7 @@ func TestReleaseLockTo(t *testing.T) {
// TestTailWriteNoSlackSpace ensures that tail writes append if there's no preallocated space.
func TestTailWriteNoSlackSpace(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -827,7 +834,7 @@ func TestTailWriteNoSlackSpace(t *testing.T) {
// TestRestartCreateWal ensures that an interrupted WAL initialization is clobbered on restart
func TestRestartCreateWal(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -867,7 +874,7 @@ func TestOpenOnTornWrite(t *testing.T) {
clobberIdx := 20
overwriteEntries := 5
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -952,7 +959,7 @@ func TestOpenOnTornWrite(t *testing.T) {
}
func TestRenameFail(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -964,7 +971,7 @@ func TestRenameFail(t *testing.T) {
}()
SegmentSizeBytes = math.MaxInt64
tp, terr := ioutil.TempDir(os.TempDir(), "waltest")
tp, terr := ioutil.TempDir(t.TempDir(), "waltest")
if terr != nil {
t.Fatal(terr)
}
@ -982,7 +989,7 @@ func TestRenameFail(t *testing.T) {
// TestReadAllFail ensure ReadAll error if used without opening the WAL
func TestReadAllFail(t *testing.T) {
dir, err := ioutil.TempDir(os.TempDir(), "waltest")
dir, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -1004,18 +1011,18 @@ func TestReadAllFail(t *testing.T) {
// TestValidSnapshotEntries ensures ValidSnapshotEntries returns all valid wal snapshot entries, accounting
// for hardstate
func TestValidSnapshotEntries(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(p)
snap0 := walpb.Snapshot{Index: 0, Term: 0}
snap1 := walpb.Snapshot{Index: 1, Term: 1}
snap0 := walpb.Snapshot{}
snap1 := walpb.Snapshot{Index: 1, Term: 1, ConfState: &confState}
state1 := raftpb.HardState{Commit: 1, Term: 1}
snap2 := walpb.Snapshot{Index: 2, Term: 1}
snap3 := walpb.Snapshot{Index: 3, Term: 2}
snap2 := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState}
snap3 := walpb.Snapshot{Index: 3, Term: 2, ConfState: &confState}
state2 := raftpb.HardState{Commit: 3, Term: 2}
snap4 := walpb.Snapshot{Index: 4, Term: 2} // will be orphaned since the last committed entry will be snap3
snap4 := walpb.Snapshot{Index: 4, Term: 2, ConfState: &confState} // will be orphaned since the last committed entry will be snap3
func() {
w, err := Create(zap.NewExample(), p, nil)
if err != nil {
@ -1061,16 +1068,16 @@ func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) {
defer func() {
SegmentSizeBytes = oldSegmentSizeBytes
}()
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(p)
snap0 := walpb.Snapshot{Index: 0, Term: 0}
snap1 := walpb.Snapshot{Index: 1, Term: 1}
snap0 := walpb.Snapshot{}
snap1 := walpb.Snapshot{Index: 1, Term: 1, ConfState: &confState}
state1 := raftpb.HardState{Commit: 1, Term: 1}
snap2 := walpb.Snapshot{Index: 2, Term: 1}
snap3 := walpb.Snapshot{Index: 3, Term: 2}
snap2 := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState}
snap3 := walpb.Snapshot{Index: 3, Term: 2, ConfState: &confState}
state2 := raftpb.HardState{Commit: 3, Term: 2}
func() {
w, err := Create(zap.NewExample(), p, nil)

View File

@ -27,3 +27,15 @@ func (rec *Record) Validate(crc uint32) error {
rec.Reset()
return ErrCRCMismatch
}
// ValidateSnapshotForWrite ensures the Snapshot the newly written snapshot is valid.
//
// There might exist log-entries written by old etcd versions that does not conform
// to the requirements.
func ValidateSnapshotForWrite(e *Snapshot) error {
// Since etcd>=3.5.0
if e.ConfState == nil && e.Index > 0 {
return errors.New("Saved (not-initial) snapshot is missing ConfState: " + e.String())
}
return nil
}

View File

@ -0,0 +1,36 @@
package walpb
import (
"testing"
"github.com/golang/protobuf/descriptor"
"go.etcd.io/etcd/raft/v3/raftpb"
)
func TestSnapshotMetadataCompatibility(t *testing.T) {
_, snapshotMetadataMd := descriptor.ForMessage(&raftpb.SnapshotMetadata{})
_, snapshotMd := descriptor.ForMessage(&Snapshot{})
if len(snapshotMetadataMd.GetField()) != len(snapshotMd.GetField()) {
t.Errorf("Different number of fields in raftpb.SnapshotMetadata vs. walpb.Snapshot. " +
"They are supposed to be in sync.")
}
}
func TestValidateSnapshot(t *testing.T) {
tests := []struct {
name string
snap *Snapshot
wantErr bool
}{
{name: "empty", snap: &Snapshot{}, wantErr: false},
{name: "invalid", snap: &Snapshot{Index: 5, Term: 3}, wantErr: true},
{name: "valid", snap: &Snapshot{Index: 5, Term: 3, ConfState: &raftpb.ConfState{Voters: []uint64{0x00cad1}}}, wantErr: false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := ValidateSnapshotForWrite(tt.snap); (err != nil) != tt.wantErr {
t.Errorf("ValidateSnapshotForWrite() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}