diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 9915128e0..3d0b93b8b 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -117,8 +117,9 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error { walSnap := walpb.Snapshot{ - Index: snap.Metadata.Index, - Term: snap.Metadata.Term, + Index: snap.Metadata.Index, + Term: snap.Metadata.Term, + ConfState: &snap.Metadata.ConfState, } // save the snapshot file before writing the snapshot to the wal. // This makes it possible for the snapshot file to become orphaned, but prevents diff --git a/etcdctl/ctlv2/command/backup_command.go b/etcdctl/ctlv2/command/backup_command.go index 2fbdbb36e..0dd3acb61 100644 --- a/etcdctl/ctlv2/command/backup_command.go +++ b/etcdctl/ctlv2/command/backup_command.go @@ -109,7 +109,7 @@ func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) { log.Fatal(err) } if snapshot != nil { - walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term + walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &snapshot.Metadata.ConfState newss := snap.New(zap.NewExample(), destSnap) if err = newss.SaveSnap(*snapshot); err != nil { log.Fatal(err) diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go index 0773ab7bd..5e07bed1f 100644 --- a/etcdctl/snapshot/v3_snapshot.go +++ b/etcdctl/snapshot/v3_snapshot.go @@ -387,6 +387,8 @@ func (s *v3Manager) saveDB() error { } // saveWALAndSnap creates a WAL for the initial cluster +// +// TODO: This code ignores learners !!! func (s *v3Manager) saveWALAndSnap() error { if err := fileutil.CreateDirAll(s.walDir); err != nil { return err @@ -454,19 +456,20 @@ func (s *v3Manager) saveWALAndSnap() error { if berr != nil { return berr } + confState := raftpb.ConfState{ + Voters: nodeIDs, + } raftSnap := raftpb.Snapshot{ Data: b, Metadata: raftpb.SnapshotMetadata{ - Index: commit, - Term: term, - ConfState: raftpb.ConfState{ - Voters: nodeIDs, - }, + Index: commit, + Term: term, + ConfState: confState, }, } sn := snap.New(s.lg, s.snapDir) if err := sn.SaveSnap(raftSnap); err != nil { return err } - return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}) + return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term, ConfState: &confState}) } diff --git a/scripts/genproto.sh b/scripts/genproto.sh index 049646eb9..8b09b6dab 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -39,10 +39,11 @@ log_callout -e "\\nRunning gofast (gogo) proto generation..." for dir in ${DIRS}; do run pushd "${dir}" - run protoc --gofast_out=plugins=grpc:. -I=".:${GOGOPROTO_PATH}:${ETCD_ROOT_DIR}/..:${GRPC_GATEWAY_ROOT}/third_party/googleapis" \ + run protoc --gofast_out=plugins=grpc:. -I=".:${GOGOPROTO_PATH}:${ETCD_ROOT_DIR}/..:${ETCD_ROOT_DIR}:${GRPC_GATEWAY_ROOT}/third_party/googleapis" \ --plugin="${GOFAST_BIN}" ./*.proto - sed -i.bak -E 's|"etcd/api/|"go.etcd.io/etcd/api/v3/|g' ./*.pb.go + run sed -i.bak -E 's|"etcd/api/|"go.etcd.io/etcd/api/v3/|g' ./*.pb.go + run sed -i.bak -E 's|"raft/raftpb"|"go.etcd.io/etcd/raft/v3/raftpb"|g' ./*.pb.go rm -f ./*.bak run gofmt -s -w ./*.pb.go @@ -69,13 +70,13 @@ for pb in api/etcdserverpb/rpc server/etcdserver/api/v3lock/v3lockpb/v3lock serv pkg=$(basename "${pkgpath}") gwfile="${pb}.pb.gw.go" - sed -i -E "s#package $pkg#package gw#g" "${gwfile}" - sed -i -E "s#import \\(#import \\(\"go.etcd.io/etcd/${pkgpath}\"#g" "${gwfile}" - sed -i -E "s#([ (])([a-zA-Z0-9_]*(Client|Server|Request)([^(]|$))#\\1${pkg}.\\2#g" "${gwfile}" - sed -i -E "s# (New[a-zA-Z0-9_]*Client\\()# ${pkg}.\\1#g" "${gwfile}" - sed -i -E "s|go.etcd.io/etcd|go.etcd.io/etcd/v3|g" "${gwfile}" - sed -i -E "s|go.etcd.io/etcd/v3/api|go.etcd.io/etcd/api/v3|g" "${gwfile}" - sed -i -E "s|go.etcd.io/etcd/v3/server|go.etcd.io/etcd/server/v3|g" "${gwfile}" + run sed -i -E "s#package $pkg#package gw#g" "${gwfile}" + run sed -i -E "s#import \\(#import \\(\"go.etcd.io/etcd/${pkgpath}\"#g" "${gwfile}" + run sed -i -E "s#([ (])([a-zA-Z0-9_]*(Client|Server|Request)([^(]|$))#\\1${pkg}.\\2#g" "${gwfile}" + run sed -i -E "s# (New[a-zA-Z0-9_]*Client\\()# ${pkg}.\\1#g" "${gwfile}" + run sed -i -E "s|go.etcd.io/etcd|go.etcd.io/etcd/v3|g" "${gwfile}" + run sed -i -E "s|go.etcd.io/etcd/v3/api|go.etcd.io/etcd/api/v3|g" "${gwfile}" + run sed -i -E "s|go.etcd.io/etcd/v3/server|go.etcd.io/etcd/server/v3|g" "${gwfile}" run go fmt "${gwfile}" diff --git a/server/etcdserver/storage.go b/server/etcdserver/storage.go index 9fad6f483..555003642 100644 --- a/server/etcdserver/storage.go +++ b/server/etcdserver/storage.go @@ -54,8 +54,9 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage { // SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry. func (st *storage) SaveSnap(snap raftpb.Snapshot) error { walsnap := walpb.Snapshot{ - Index: snap.Metadata.Index, - Term: snap.Metadata.Term, + Index: snap.Metadata.Index, + Term: snap.Metadata.Term, + ConfState: &snap.Metadata.ConfState, } // save the snapshot file before writing the snapshot to the wal. // This makes it possible for the snapshot file to become orphaned, but prevents diff --git a/server/wal/wal.go b/server/wal/wal.go index 68303cd3a..509cd9099 100644 --- a/server/wal/wal.go +++ b/server/wal/wal.go @@ -608,7 +608,6 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro } } snaps = snaps[:n:n] - return snaps, nil } @@ -944,6 +943,10 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { } func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { + if err := walpb.ValidateSnapshotForWrite(&e); err != nil { + return err + } + b := pbutil.MustMarshal(&e) w.mu.Lock() diff --git a/server/wal/wal_test.go b/server/wal/wal_test.go index a2dd70044..d92d578de 100644 --- a/server/wal/wal_test.go +++ b/server/wal/wal_test.go @@ -35,8 +35,15 @@ import ( "go.uber.org/zap" ) +var ( + confState = raftpb.ConfState{ + Voters: []uint64{0x00ffca74}, + AutoLeave: false, + } +) + func TestNew(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -90,7 +97,7 @@ func TestNew(t *testing.T) { } func TestCreateFailFromPollutedDir(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -104,7 +111,7 @@ func TestCreateFailFromPollutedDir(t *testing.T) { } func TestWalCleanup(t *testing.T) { - testRoot, err := ioutil.TempDir(os.TempDir(), "waltestroot") + testRoot, err := ioutil.TempDir(t.TempDir(), "waltestroot") if err != nil { t.Fatal(err) } @@ -135,7 +142,7 @@ func TestWalCleanup(t *testing.T) { } func TestCreateFailFromNoSpaceLeft(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -154,7 +161,7 @@ func TestCreateFailFromNoSpaceLeft(t *testing.T) { } func TestNewForInitedDir(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -167,7 +174,7 @@ func TestNewForInitedDir(t *testing.T) { } func TestOpenAtIndex(t *testing.T) { - dir, err := ioutil.TempDir(os.TempDir(), "waltest") + dir, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -210,7 +217,7 @@ func TestOpenAtIndex(t *testing.T) { } w.Close() - emptydir, err := ioutil.TempDir(os.TempDir(), "waltestempty") + emptydir, err := ioutil.TempDir(t.TempDir(), "waltestempty") if err != nil { t.Fatal(err) } @@ -224,7 +231,7 @@ func TestOpenAtIndex(t *testing.T) { // The test creates a WAL directory and cuts out multiple WAL files. Then // it corrupts one of the files by completely truncating it. func TestVerify(t *testing.T) { - walDir, err := ioutil.TempDir(os.TempDir(), "waltest") + walDir, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -273,7 +280,7 @@ func TestVerify(t *testing.T) { // TODO: split it into smaller tests for better readability func TestCut(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -304,7 +311,7 @@ func TestCut(t *testing.T) { if err = w.cut(); err != nil { t.Fatal(err) } - snap := walpb.Snapshot{Index: 2, Term: 1} + snap := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState} if err = w.SaveSnapshot(snap); err != nil { t.Fatal(err) } @@ -335,7 +342,7 @@ func TestCut(t *testing.T) { } func TestSaveWithCut(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -398,7 +405,7 @@ func TestSaveWithCut(t *testing.T) { } func TestRecover(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -513,7 +520,7 @@ func TestScanWalName(t *testing.T) { } func TestRecoverAfterCut(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -524,7 +531,7 @@ func TestRecoverAfterCut(t *testing.T) { t.Fatal(err) } for i := 0; i < 10; i++ { - if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil { + if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i), Term: 1, ConfState: &confState}); err != nil { t.Fatal(err) } es := []raftpb.Entry{{Index: uint64(i)}} @@ -542,7 +549,7 @@ func TestRecoverAfterCut(t *testing.T) { } for i := 0; i < 10; i++ { - w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i)}) + w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i), Term: 1}) if err != nil { if i <= 4 { if err != ErrFileNotFound { @@ -571,7 +578,7 @@ func TestRecoverAfterCut(t *testing.T) { } func TestOpenAtUncommittedIndex(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -605,7 +612,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) { // it releases the lock of part of data, and excepts that OpenForRead // can read out all files even if some are locked for write. func TestOpenForRead(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -646,7 +653,7 @@ func TestOpenForRead(t *testing.T) { } func TestOpenWithMaxIndex(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -689,7 +696,7 @@ func TestSaveEmpty(t *testing.T) { } func TestReleaseLockTo(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -761,7 +768,7 @@ func TestReleaseLockTo(t *testing.T) { // TestTailWriteNoSlackSpace ensures that tail writes append if there's no preallocated space. func TestTailWriteNoSlackSpace(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -827,7 +834,7 @@ func TestTailWriteNoSlackSpace(t *testing.T) { // TestRestartCreateWal ensures that an interrupted WAL initialization is clobbered on restart func TestRestartCreateWal(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -867,7 +874,7 @@ func TestOpenOnTornWrite(t *testing.T) { clobberIdx := 20 overwriteEntries := 5 - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -952,7 +959,7 @@ func TestOpenOnTornWrite(t *testing.T) { } func TestRenameFail(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -964,7 +971,7 @@ func TestRenameFail(t *testing.T) { }() SegmentSizeBytes = math.MaxInt64 - tp, terr := ioutil.TempDir(os.TempDir(), "waltest") + tp, terr := ioutil.TempDir(t.TempDir(), "waltest") if terr != nil { t.Fatal(terr) } @@ -982,7 +989,7 @@ func TestRenameFail(t *testing.T) { // TestReadAllFail ensure ReadAll error if used without opening the WAL func TestReadAllFail(t *testing.T) { - dir, err := ioutil.TempDir(os.TempDir(), "waltest") + dir, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } @@ -1004,18 +1011,18 @@ func TestReadAllFail(t *testing.T) { // TestValidSnapshotEntries ensures ValidSnapshotEntries returns all valid wal snapshot entries, accounting // for hardstate func TestValidSnapshotEntries(t *testing.T) { - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } defer os.RemoveAll(p) - snap0 := walpb.Snapshot{Index: 0, Term: 0} - snap1 := walpb.Snapshot{Index: 1, Term: 1} + snap0 := walpb.Snapshot{} + snap1 := walpb.Snapshot{Index: 1, Term: 1, ConfState: &confState} state1 := raftpb.HardState{Commit: 1, Term: 1} - snap2 := walpb.Snapshot{Index: 2, Term: 1} - snap3 := walpb.Snapshot{Index: 3, Term: 2} + snap2 := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState} + snap3 := walpb.Snapshot{Index: 3, Term: 2, ConfState: &confState} state2 := raftpb.HardState{Commit: 3, Term: 2} - snap4 := walpb.Snapshot{Index: 4, Term: 2} // will be orphaned since the last committed entry will be snap3 + snap4 := walpb.Snapshot{Index: 4, Term: 2, ConfState: &confState} // will be orphaned since the last committed entry will be snap3 func() { w, err := Create(zap.NewExample(), p, nil) if err != nil { @@ -1061,16 +1068,16 @@ func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) { defer func() { SegmentSizeBytes = oldSegmentSizeBytes }() - p, err := ioutil.TempDir(os.TempDir(), "waltest") + p, err := ioutil.TempDir(t.TempDir(), "waltest") if err != nil { t.Fatal(err) } defer os.RemoveAll(p) - snap0 := walpb.Snapshot{Index: 0, Term: 0} - snap1 := walpb.Snapshot{Index: 1, Term: 1} + snap0 := walpb.Snapshot{} + snap1 := walpb.Snapshot{Index: 1, Term: 1, ConfState: &confState} state1 := raftpb.HardState{Commit: 1, Term: 1} - snap2 := walpb.Snapshot{Index: 2, Term: 1} - snap3 := walpb.Snapshot{Index: 3, Term: 2} + snap2 := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState} + snap3 := walpb.Snapshot{Index: 3, Term: 2, ConfState: &confState} state2 := raftpb.HardState{Commit: 3, Term: 2} func() { w, err := Create(zap.NewExample(), p, nil) diff --git a/server/wal/walpb/record.go b/server/wal/walpb/record.go index 30a05e0c1..e2070fbba 100644 --- a/server/wal/walpb/record.go +++ b/server/wal/walpb/record.go @@ -27,3 +27,15 @@ func (rec *Record) Validate(crc uint32) error { rec.Reset() return ErrCRCMismatch } + +// ValidateSnapshotForWrite ensures the Snapshot the newly written snapshot is valid. +// +// There might exist log-entries written by old etcd versions that does not conform +// to the requirements. +func ValidateSnapshotForWrite(e *Snapshot) error { + // Since etcd>=3.5.0 + if e.ConfState == nil && e.Index > 0 { + return errors.New("Saved (not-initial) snapshot is missing ConfState: " + e.String()) + } + return nil +} diff --git a/server/wal/walpb/record.pb.go b/server/wal/walpb/record.pb.go index 4eec4b219..654d82636 100644 --- a/server/wal/walpb/record.pb.go +++ b/server/wal/walpb/record.pb.go @@ -11,6 +11,7 @@ import ( _ "github.com/gogo/protobuf/gogoproto" proto "github.com/golang/protobuf/proto" + raftpb "go.etcd.io/etcd/raft/v3/raftpb" ) // Reference imports to suppress errors if they are not otherwise used. @@ -66,12 +67,15 @@ func (m *Record) XXX_DiscardUnknown() { var xxx_messageInfo_Record proto.InternalMessageInfo +// Keep in sync with raftpb.SnapshotMetadata. type Snapshot struct { - Index uint64 `protobuf:"varint,1,opt,name=index" json:"index"` - Term uint64 `protobuf:"varint,2,opt,name=term" json:"term"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Index uint64 `protobuf:"varint,1,opt,name=index" json:"index"` + Term uint64 `protobuf:"varint,2,opt,name=term" json:"term"` + // Field populated since >=etcd-3.5.0. + ConfState *raftpb.ConfState `protobuf:"bytes,3,opt,name=conf_state,json=confState" json:"conf_state,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *Snapshot) Reset() { *m = Snapshot{} } @@ -115,19 +119,22 @@ func init() { func init() { proto.RegisterFile("record.proto", fileDescriptor_bf94fd919e302a1d) } var fileDescriptor_bf94fd919e302a1d = []byte{ - // 186 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x4a, 0x4d, 0xce, - 0x2f, 0x4a, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2d, 0x4f, 0xcc, 0x29, 0x48, 0x92, - 0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x8b, 0xe8, 0x83, 0x58, 0x10, 0x49, 0x25, 0x3f, 0x2e, 0xb6, - 0x20, 0xb0, 0x62, 0x21, 0x09, 0x2e, 0x96, 0x92, 0xca, 0x82, 0x54, 0x09, 0x46, 0x05, 0x46, 0x0d, - 0x66, 0x27, 0x96, 0x13, 0xf7, 0xe4, 0x19, 0x82, 0xc0, 0x22, 0x42, 0x62, 0x5c, 0xcc, 0xc9, 0x45, - 0xc9, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xbc, 0x50, 0x09, 0x90, 0x80, 0x90, 0x10, 0x17, 0x4b, 0x4a, - 0x62, 0x49, 0xa2, 0x04, 0xb3, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x98, 0xad, 0xe4, 0xc0, 0xc5, 0x11, - 0x9c, 0x97, 0x58, 0x50, 0x9c, 0x91, 0x5f, 0x22, 0x24, 0xc5, 0xc5, 0x9a, 0x99, 0x97, 0x92, 0x5a, - 0x01, 0x36, 0x92, 0x05, 0xaa, 0x13, 0x22, 0x04, 0xb6, 0x2d, 0xb5, 0x28, 0x17, 0x6c, 0x28, 0x0b, - 0xdc, 0xb6, 0xd4, 0xa2, 0x5c, 0x27, 0x91, 0x13, 0x0f, 0xe5, 0x18, 0x4e, 0x3c, 0x92, 0x63, 0xbc, - 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x19, 0x8f, 0xe5, 0x18, 0x00, 0x01, 0x00, 0x00, - 0xff, 0xff, 0x7f, 0x5e, 0x5c, 0x46, 0xd3, 0x00, 0x00, 0x00, + // 234 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x3c, 0x8e, 0x41, 0x4e, 0xc3, 0x30, + 0x10, 0x45, 0x63, 0xe2, 0x22, 0x18, 0xca, 0x02, 0xab, 0xaa, 0xa2, 0x2c, 0x4c, 0xd4, 0x55, 0x56, + 0x29, 0xe2, 0x08, 0x65, 0xcf, 0x22, 0x3d, 0x00, 0x72, 0x1d, 0xa7, 0x20, 0xd1, 0x8c, 0x35, 0xb5, + 0x04, 0xdc, 0x84, 0x23, 0x65, 0xc9, 0x09, 0x10, 0x84, 0x8b, 0xa0, 0x8c, 0x03, 0x1b, 0xfb, 0xeb, + 0x7d, 0xf9, 0x7d, 0xc3, 0x9c, 0x9c, 0x45, 0x6a, 0x2a, 0x4f, 0x18, 0x50, 0xcd, 0x5e, 0xcc, 0xb3, + 0xdf, 0xe5, 0x8b, 0x3d, 0xee, 0x91, 0xc9, 0x7a, 0x4c, 0xb1, 0xcc, 0x97, 0x64, 0xda, 0xb0, 0x1e, + 0x0f, 0xbf, 0xe3, 0x2b, 0xf2, 0xd5, 0x3d, 0x9c, 0xd6, 0x2c, 0x51, 0x19, 0xc8, 0xf0, 0xe6, 0x5d, + 0x26, 0x0a, 0x51, 0xa6, 0x1b, 0xd9, 0x7f, 0x5e, 0x27, 0x35, 0x13, 0xb5, 0x84, 0xd4, 0x92, 0xcd, + 0x4e, 0x0a, 0x51, 0x5e, 0x4e, 0xc5, 0x08, 0x94, 0x02, 0xd9, 0x98, 0x60, 0xb2, 0xb4, 0x10, 0xe5, + 0xbc, 0xe6, 0xbc, 0x22, 0x38, 0xdb, 0x76, 0xc6, 0x1f, 0x1f, 0x31, 0xa8, 0x1c, 0x66, 0x4f, 0x5d, + 0xe3, 0x5e, 0x59, 0x29, 0xa7, 0x97, 0x11, 0xf1, 0x9a, 0xa3, 0x03, 0x4b, 0xe5, 0xff, 0x9a, 0xa3, + 0x83, 0xba, 0x01, 0xb0, 0xd8, 0xb5, 0x0f, 0xc7, 0x60, 0x82, 0x63, 0xf7, 0xc5, 0xed, 0x55, 0x15, + 0x7f, 0x5e, 0xdd, 0x61, 0xd7, 0x6e, 0xc7, 0xa2, 0x3e, 0xb7, 0x7f, 0x71, 0xb3, 0xe8, 0xbf, 0x75, + 0xd2, 0x0f, 0x5a, 0x7c, 0x0c, 0x5a, 0x7c, 0x0d, 0x5a, 0xbc, 0xff, 0xe8, 0xe4, 0x37, 0x00, 0x00, + 0xff, 0xff, 0xc3, 0x36, 0x0c, 0xad, 0x1d, 0x01, 0x00, 0x00, } func (m *Record) Marshal() (dAtA []byte, err error) { @@ -194,6 +201,18 @@ func (m *Snapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if m.ConfState != nil { + { + size, err := m.ConfState.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRecord(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } i = encodeVarintRecord(dAtA, i, uint64(m.Term)) i-- dAtA[i] = 0x10 @@ -240,6 +259,10 @@ func (m *Snapshot) Size() (n int) { _ = l n += 1 + sovRecord(uint64(m.Index)) n += 1 + sovRecord(uint64(m.Term)) + if m.ConfState != nil { + l = m.ConfState.Size() + n += 1 + l + sovRecord(uint64(l)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -442,6 +465,42 @@ func (m *Snapshot) Unmarshal(dAtA []byte) error { break } } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ConfState", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRecord + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRecord + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRecord + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.ConfState == nil { + m.ConfState = &raftpb.ConfState{} + } + if err := m.ConfState.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRecord(dAtA[iNdEx:]) diff --git a/server/wal/walpb/record.proto b/server/wal/walpb/record.proto index b694cb233..536fa6c19 100644 --- a/server/wal/walpb/record.proto +++ b/server/wal/walpb/record.proto @@ -2,6 +2,7 @@ syntax = "proto2"; package walpb; import "gogoproto/gogo.proto"; +import "raft/raftpb/raft.proto"; option (gogoproto.marshaler_all) = true; option (gogoproto.sizer_all) = true; @@ -14,7 +15,10 @@ message Record { optional bytes data = 3; } +// Keep in sync with raftpb.SnapshotMetadata. message Snapshot { optional uint64 index = 1 [(gogoproto.nullable) = false]; optional uint64 term = 2 [(gogoproto.nullable) = false]; + // Field populated since >=etcd-3.5.0. + optional raftpb.ConfState conf_state = 3; } diff --git a/server/wal/walpb/record_test.go b/server/wal/walpb/record_test.go new file mode 100644 index 000000000..829413635 --- /dev/null +++ b/server/wal/walpb/record_test.go @@ -0,0 +1,36 @@ +package walpb + +import ( + "testing" + + "github.com/golang/protobuf/descriptor" + "go.etcd.io/etcd/raft/v3/raftpb" +) + +func TestSnapshotMetadataCompatibility(t *testing.T) { + _, snapshotMetadataMd := descriptor.ForMessage(&raftpb.SnapshotMetadata{}) + _, snapshotMd := descriptor.ForMessage(&Snapshot{}) + if len(snapshotMetadataMd.GetField()) != len(snapshotMd.GetField()) { + t.Errorf("Different number of fields in raftpb.SnapshotMetadata vs. walpb.Snapshot. " + + "They are supposed to be in sync.") + } +} + +func TestValidateSnapshot(t *testing.T) { + tests := []struct { + name string + snap *Snapshot + wantErr bool + }{ + {name: "empty", snap: &Snapshot{}, wantErr: false}, + {name: "invalid", snap: &Snapshot{Index: 5, Term: 3}, wantErr: true}, + {name: "valid", snap: &Snapshot{Index: 5, Term: 3, ConfState: &raftpb.ConfState{Voters: []uint64{0x00cad1}}}, wantErr: false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := ValidateSnapshotForWrite(tt.snap); (err != nil) != tt.wantErr { + t.Errorf("ValidateSnapshotForWrite() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +}