Merge pull request #12735 from ptabor/20210227-reduce-store-v2-clean

no-store_v2:  Store ConfState as part of WAL log snapshot
This commit is contained in:
Piotr Tabor 2021-03-25 08:23:00 +01:00 committed by GitHub
commit 8ee1dd9e23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 202 additions and 75 deletions

View File

@ -117,8 +117,9 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte,
func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error {
walSnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
ConfState: &snap.Metadata.ConfState,
}
// save the snapshot file before writing the snapshot to the wal.
// This makes it possible for the snapshot file to become orphaned, but prevents

View File

@ -109,7 +109,7 @@ func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) {
log.Fatal(err)
}
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &snapshot.Metadata.ConfState
newss := snap.New(zap.NewExample(), destSnap)
if err = newss.SaveSnap(*snapshot); err != nil {
log.Fatal(err)

View File

@ -387,6 +387,8 @@ func (s *v3Manager) saveDB() error {
}
// saveWALAndSnap creates a WAL for the initial cluster
//
// TODO: This code ignores learners !!!
func (s *v3Manager) saveWALAndSnap() error {
if err := fileutil.CreateDirAll(s.walDir); err != nil {
return err
@ -454,19 +456,20 @@ func (s *v3Manager) saveWALAndSnap() error {
if berr != nil {
return berr
}
confState := raftpb.ConfState{
Voters: nodeIDs,
}
raftSnap := raftpb.Snapshot{
Data: b,
Metadata: raftpb.SnapshotMetadata{
Index: commit,
Term: term,
ConfState: raftpb.ConfState{
Voters: nodeIDs,
},
Index: commit,
Term: term,
ConfState: confState,
},
}
sn := snap.New(s.lg, s.snapDir)
if err := sn.SaveSnap(raftSnap); err != nil {
return err
}
return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term})
return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term, ConfState: &confState})
}

View File

@ -39,10 +39,11 @@ log_callout -e "\\nRunning gofast (gogo) proto generation..."
for dir in ${DIRS}; do
run pushd "${dir}"
run protoc --gofast_out=plugins=grpc:. -I=".:${GOGOPROTO_PATH}:${ETCD_ROOT_DIR}/..:${GRPC_GATEWAY_ROOT}/third_party/googleapis" \
run protoc --gofast_out=plugins=grpc:. -I=".:${GOGOPROTO_PATH}:${ETCD_ROOT_DIR}/..:${ETCD_ROOT_DIR}:${GRPC_GATEWAY_ROOT}/third_party/googleapis" \
--plugin="${GOFAST_BIN}" ./*.proto
sed -i.bak -E 's|"etcd/api/|"go.etcd.io/etcd/api/v3/|g' ./*.pb.go
run sed -i.bak -E 's|"etcd/api/|"go.etcd.io/etcd/api/v3/|g' ./*.pb.go
run sed -i.bak -E 's|"raft/raftpb"|"go.etcd.io/etcd/raft/v3/raftpb"|g' ./*.pb.go
rm -f ./*.bak
run gofmt -s -w ./*.pb.go
@ -69,13 +70,13 @@ for pb in api/etcdserverpb/rpc server/etcdserver/api/v3lock/v3lockpb/v3lock serv
pkg=$(basename "${pkgpath}")
gwfile="${pb}.pb.gw.go"
sed -i -E "s#package $pkg#package gw#g" "${gwfile}"
sed -i -E "s#import \\(#import \\(\"go.etcd.io/etcd/${pkgpath}\"#g" "${gwfile}"
sed -i -E "s#([ (])([a-zA-Z0-9_]*(Client|Server|Request)([^(]|$))#\\1${pkg}.\\2#g" "${gwfile}"
sed -i -E "s# (New[a-zA-Z0-9_]*Client\\()# ${pkg}.\\1#g" "${gwfile}"
sed -i -E "s|go.etcd.io/etcd|go.etcd.io/etcd/v3|g" "${gwfile}"
sed -i -E "s|go.etcd.io/etcd/v3/api|go.etcd.io/etcd/api/v3|g" "${gwfile}"
sed -i -E "s|go.etcd.io/etcd/v3/server|go.etcd.io/etcd/server/v3|g" "${gwfile}"
run sed -i -E "s#package $pkg#package gw#g" "${gwfile}"
run sed -i -E "s#import \\(#import \\(\"go.etcd.io/etcd/${pkgpath}\"#g" "${gwfile}"
run sed -i -E "s#([ (])([a-zA-Z0-9_]*(Client|Server|Request)([^(]|$))#\\1${pkg}.\\2#g" "${gwfile}"
run sed -i -E "s# (New[a-zA-Z0-9_]*Client\\()# ${pkg}.\\1#g" "${gwfile}"
run sed -i -E "s|go.etcd.io/etcd|go.etcd.io/etcd/v3|g" "${gwfile}"
run sed -i -E "s|go.etcd.io/etcd/v3/api|go.etcd.io/etcd/api/v3|g" "${gwfile}"
run sed -i -E "s|go.etcd.io/etcd/v3/server|go.etcd.io/etcd/server/v3|g" "${gwfile}"
run go fmt "${gwfile}"

View File

@ -54,8 +54,9 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry.
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
walsnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
ConfState: &snap.Metadata.ConfState,
}
// save the snapshot file before writing the snapshot to the wal.
// This makes it possible for the snapshot file to become orphaned, but prevents

View File

@ -608,7 +608,6 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro
}
}
snaps = snaps[:n:n]
return snaps, nil
}
@ -944,6 +943,10 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
}
func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
if err := walpb.ValidateSnapshotForWrite(&e); err != nil {
return err
}
b := pbutil.MustMarshal(&e)
w.mu.Lock()

View File

@ -35,8 +35,15 @@ import (
"go.uber.org/zap"
)
var (
confState = raftpb.ConfState{
Voters: []uint64{0x00ffca74},
AutoLeave: false,
}
)
func TestNew(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -90,7 +97,7 @@ func TestNew(t *testing.T) {
}
func TestCreateFailFromPollutedDir(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -104,7 +111,7 @@ func TestCreateFailFromPollutedDir(t *testing.T) {
}
func TestWalCleanup(t *testing.T) {
testRoot, err := ioutil.TempDir(os.TempDir(), "waltestroot")
testRoot, err := ioutil.TempDir(t.TempDir(), "waltestroot")
if err != nil {
t.Fatal(err)
}
@ -135,7 +142,7 @@ func TestWalCleanup(t *testing.T) {
}
func TestCreateFailFromNoSpaceLeft(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -154,7 +161,7 @@ func TestCreateFailFromNoSpaceLeft(t *testing.T) {
}
func TestNewForInitedDir(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -167,7 +174,7 @@ func TestNewForInitedDir(t *testing.T) {
}
func TestOpenAtIndex(t *testing.T) {
dir, err := ioutil.TempDir(os.TempDir(), "waltest")
dir, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -210,7 +217,7 @@ func TestOpenAtIndex(t *testing.T) {
}
w.Close()
emptydir, err := ioutil.TempDir(os.TempDir(), "waltestempty")
emptydir, err := ioutil.TempDir(t.TempDir(), "waltestempty")
if err != nil {
t.Fatal(err)
}
@ -224,7 +231,7 @@ func TestOpenAtIndex(t *testing.T) {
// The test creates a WAL directory and cuts out multiple WAL files. Then
// it corrupts one of the files by completely truncating it.
func TestVerify(t *testing.T) {
walDir, err := ioutil.TempDir(os.TempDir(), "waltest")
walDir, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -273,7 +280,7 @@ func TestVerify(t *testing.T) {
// TODO: split it into smaller tests for better readability
func TestCut(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -304,7 +311,7 @@ func TestCut(t *testing.T) {
if err = w.cut(); err != nil {
t.Fatal(err)
}
snap := walpb.Snapshot{Index: 2, Term: 1}
snap := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState}
if err = w.SaveSnapshot(snap); err != nil {
t.Fatal(err)
}
@ -335,7 +342,7 @@ func TestCut(t *testing.T) {
}
func TestSaveWithCut(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -398,7 +405,7 @@ func TestSaveWithCut(t *testing.T) {
}
func TestRecover(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -513,7 +520,7 @@ func TestScanWalName(t *testing.T) {
}
func TestRecoverAfterCut(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -524,7 +531,7 @@ func TestRecoverAfterCut(t *testing.T) {
t.Fatal(err)
}
for i := 0; i < 10; i++ {
if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil {
if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i), Term: 1, ConfState: &confState}); err != nil {
t.Fatal(err)
}
es := []raftpb.Entry{{Index: uint64(i)}}
@ -542,7 +549,7 @@ func TestRecoverAfterCut(t *testing.T) {
}
for i := 0; i < 10; i++ {
w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i)})
w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i), Term: 1})
if err != nil {
if i <= 4 {
if err != ErrFileNotFound {
@ -571,7 +578,7 @@ func TestRecoverAfterCut(t *testing.T) {
}
func TestOpenAtUncommittedIndex(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -605,7 +612,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
// it releases the lock of part of data, and excepts that OpenForRead
// can read out all files even if some are locked for write.
func TestOpenForRead(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -646,7 +653,7 @@ func TestOpenForRead(t *testing.T) {
}
func TestOpenWithMaxIndex(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -689,7 +696,7 @@ func TestSaveEmpty(t *testing.T) {
}
func TestReleaseLockTo(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -761,7 +768,7 @@ func TestReleaseLockTo(t *testing.T) {
// TestTailWriteNoSlackSpace ensures that tail writes append if there's no preallocated space.
func TestTailWriteNoSlackSpace(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -827,7 +834,7 @@ func TestTailWriteNoSlackSpace(t *testing.T) {
// TestRestartCreateWal ensures that an interrupted WAL initialization is clobbered on restart
func TestRestartCreateWal(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -867,7 +874,7 @@ func TestOpenOnTornWrite(t *testing.T) {
clobberIdx := 20
overwriteEntries := 5
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -952,7 +959,7 @@ func TestOpenOnTornWrite(t *testing.T) {
}
func TestRenameFail(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -964,7 +971,7 @@ func TestRenameFail(t *testing.T) {
}()
SegmentSizeBytes = math.MaxInt64
tp, terr := ioutil.TempDir(os.TempDir(), "waltest")
tp, terr := ioutil.TempDir(t.TempDir(), "waltest")
if terr != nil {
t.Fatal(terr)
}
@ -982,7 +989,7 @@ func TestRenameFail(t *testing.T) {
// TestReadAllFail ensure ReadAll error if used without opening the WAL
func TestReadAllFail(t *testing.T) {
dir, err := ioutil.TempDir(os.TempDir(), "waltest")
dir, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
@ -1004,18 +1011,18 @@ func TestReadAllFail(t *testing.T) {
// TestValidSnapshotEntries ensures ValidSnapshotEntries returns all valid wal snapshot entries, accounting
// for hardstate
func TestValidSnapshotEntries(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(p)
snap0 := walpb.Snapshot{Index: 0, Term: 0}
snap1 := walpb.Snapshot{Index: 1, Term: 1}
snap0 := walpb.Snapshot{}
snap1 := walpb.Snapshot{Index: 1, Term: 1, ConfState: &confState}
state1 := raftpb.HardState{Commit: 1, Term: 1}
snap2 := walpb.Snapshot{Index: 2, Term: 1}
snap3 := walpb.Snapshot{Index: 3, Term: 2}
snap2 := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState}
snap3 := walpb.Snapshot{Index: 3, Term: 2, ConfState: &confState}
state2 := raftpb.HardState{Commit: 3, Term: 2}
snap4 := walpb.Snapshot{Index: 4, Term: 2} // will be orphaned since the last committed entry will be snap3
snap4 := walpb.Snapshot{Index: 4, Term: 2, ConfState: &confState} // will be orphaned since the last committed entry will be snap3
func() {
w, err := Create(zap.NewExample(), p, nil)
if err != nil {
@ -1061,16 +1068,16 @@ func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) {
defer func() {
SegmentSizeBytes = oldSegmentSizeBytes
}()
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(p)
snap0 := walpb.Snapshot{Index: 0, Term: 0}
snap1 := walpb.Snapshot{Index: 1, Term: 1}
snap0 := walpb.Snapshot{}
snap1 := walpb.Snapshot{Index: 1, Term: 1, ConfState: &confState}
state1 := raftpb.HardState{Commit: 1, Term: 1}
snap2 := walpb.Snapshot{Index: 2, Term: 1}
snap3 := walpb.Snapshot{Index: 3, Term: 2}
snap2 := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState}
snap3 := walpb.Snapshot{Index: 3, Term: 2, ConfState: &confState}
state2 := raftpb.HardState{Commit: 3, Term: 2}
func() {
w, err := Create(zap.NewExample(), p, nil)

View File

@ -27,3 +27,15 @@ func (rec *Record) Validate(crc uint32) error {
rec.Reset()
return ErrCRCMismatch
}
// ValidateSnapshotForWrite ensures the Snapshot the newly written snapshot is valid.
//
// There might exist log-entries written by old etcd versions that does not conform
// to the requirements.
func ValidateSnapshotForWrite(e *Snapshot) error {
// Since etcd>=3.5.0
if e.ConfState == nil && e.Index > 0 {
return errors.New("Saved (not-initial) snapshot is missing ConfState: " + e.String())
}
return nil
}

View File

@ -11,6 +11,7 @@ import (
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/golang/protobuf/proto"
raftpb "go.etcd.io/etcd/raft/v3/raftpb"
)
// Reference imports to suppress errors if they are not otherwise used.
@ -66,12 +67,15 @@ func (m *Record) XXX_DiscardUnknown() {
var xxx_messageInfo_Record proto.InternalMessageInfo
// Keep in sync with raftpb.SnapshotMetadata.
type Snapshot struct {
Index uint64 `protobuf:"varint,1,opt,name=index" json:"index"`
Term uint64 `protobuf:"varint,2,opt,name=term" json:"term"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Index uint64 `protobuf:"varint,1,opt,name=index" json:"index"`
Term uint64 `protobuf:"varint,2,opt,name=term" json:"term"`
// Field populated since >=etcd-3.5.0.
ConfState *raftpb.ConfState `protobuf:"bytes,3,opt,name=conf_state,json=confState" json:"conf_state,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Snapshot) Reset() { *m = Snapshot{} }
@ -115,19 +119,22 @@ func init() {
func init() { proto.RegisterFile("record.proto", fileDescriptor_bf94fd919e302a1d) }
var fileDescriptor_bf94fd919e302a1d = []byte{
// 186 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x4a, 0x4d, 0xce,
0x2f, 0x4a, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2d, 0x4f, 0xcc, 0x29, 0x48, 0x92,
0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x8b, 0xe8, 0x83, 0x58, 0x10, 0x49, 0x25, 0x3f, 0x2e, 0xb6,
0x20, 0xb0, 0x62, 0x21, 0x09, 0x2e, 0x96, 0x92, 0xca, 0x82, 0x54, 0x09, 0x46, 0x05, 0x46, 0x0d,
0x66, 0x27, 0x96, 0x13, 0xf7, 0xe4, 0x19, 0x82, 0xc0, 0x22, 0x42, 0x62, 0x5c, 0xcc, 0xc9, 0x45,
0xc9, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xbc, 0x50, 0x09, 0x90, 0x80, 0x90, 0x10, 0x17, 0x4b, 0x4a,
0x62, 0x49, 0xa2, 0x04, 0xb3, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x98, 0xad, 0xe4, 0xc0, 0xc5, 0x11,
0x9c, 0x97, 0x58, 0x50, 0x9c, 0x91, 0x5f, 0x22, 0x24, 0xc5, 0xc5, 0x9a, 0x99, 0x97, 0x92, 0x5a,
0x01, 0x36, 0x92, 0x05, 0xaa, 0x13, 0x22, 0x04, 0xb6, 0x2d, 0xb5, 0x28, 0x17, 0x6c, 0x28, 0x0b,
0xdc, 0xb6, 0xd4, 0xa2, 0x5c, 0x27, 0x91, 0x13, 0x0f, 0xe5, 0x18, 0x4e, 0x3c, 0x92, 0x63, 0xbc,
0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x19, 0x8f, 0xe5, 0x18, 0x00, 0x01, 0x00, 0x00,
0xff, 0xff, 0x7f, 0x5e, 0x5c, 0x46, 0xd3, 0x00, 0x00, 0x00,
// 234 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x3c, 0x8e, 0x41, 0x4e, 0xc3, 0x30,
0x10, 0x45, 0x63, 0xe2, 0x22, 0x18, 0xca, 0x02, 0xab, 0xaa, 0xa2, 0x2c, 0x4c, 0xd4, 0x55, 0x56,
0x29, 0xe2, 0x08, 0x65, 0xcf, 0x22, 0x3d, 0x00, 0x72, 0x1d, 0xa7, 0x20, 0xd1, 0x8c, 0x35, 0xb5,
0x04, 0xdc, 0x84, 0x23, 0x65, 0xc9, 0x09, 0x10, 0x84, 0x8b, 0xa0, 0x8c, 0x03, 0x1b, 0xfb, 0xeb,
0x7d, 0xf9, 0x7d, 0xc3, 0x9c, 0x9c, 0x45, 0x6a, 0x2a, 0x4f, 0x18, 0x50, 0xcd, 0x5e, 0xcc, 0xb3,
0xdf, 0xe5, 0x8b, 0x3d, 0xee, 0x91, 0xc9, 0x7a, 0x4c, 0xb1, 0xcc, 0x97, 0x64, 0xda, 0xb0, 0x1e,
0x0f, 0xbf, 0xe3, 0x2b, 0xf2, 0xd5, 0x3d, 0x9c, 0xd6, 0x2c, 0x51, 0x19, 0xc8, 0xf0, 0xe6, 0x5d,
0x26, 0x0a, 0x51, 0xa6, 0x1b, 0xd9, 0x7f, 0x5e, 0x27, 0x35, 0x13, 0xb5, 0x84, 0xd4, 0x92, 0xcd,
0x4e, 0x0a, 0x51, 0x5e, 0x4e, 0xc5, 0x08, 0x94, 0x02, 0xd9, 0x98, 0x60, 0xb2, 0xb4, 0x10, 0xe5,
0xbc, 0xe6, 0xbc, 0x22, 0x38, 0xdb, 0x76, 0xc6, 0x1f, 0x1f, 0x31, 0xa8, 0x1c, 0x66, 0x4f, 0x5d,
0xe3, 0x5e, 0x59, 0x29, 0xa7, 0x97, 0x11, 0xf1, 0x9a, 0xa3, 0x03, 0x4b, 0xe5, 0xff, 0x9a, 0xa3,
0x83, 0xba, 0x01, 0xb0, 0xd8, 0xb5, 0x0f, 0xc7, 0x60, 0x82, 0x63, 0xf7, 0xc5, 0xed, 0x55, 0x15,
0x7f, 0x5e, 0xdd, 0x61, 0xd7, 0x6e, 0xc7, 0xa2, 0x3e, 0xb7, 0x7f, 0x71, 0xb3, 0xe8, 0xbf, 0x75,
0xd2, 0x0f, 0x5a, 0x7c, 0x0c, 0x5a, 0x7c, 0x0d, 0x5a, 0xbc, 0xff, 0xe8, 0xe4, 0x37, 0x00, 0x00,
0xff, 0xff, 0xc3, 0x36, 0x0c, 0xad, 0x1d, 0x01, 0x00, 0x00,
}
func (m *Record) Marshal() (dAtA []byte, err error) {
@ -194,6 +201,18 @@ func (m *Snapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.ConfState != nil {
{
size, err := m.ConfState.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRecord(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
i = encodeVarintRecord(dAtA, i, uint64(m.Term))
i--
dAtA[i] = 0x10
@ -240,6 +259,10 @@ func (m *Snapshot) Size() (n int) {
_ = l
n += 1 + sovRecord(uint64(m.Index))
n += 1 + sovRecord(uint64(m.Term))
if m.ConfState != nil {
l = m.ConfState.Size()
n += 1 + l + sovRecord(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@ -442,6 +465,42 @@ func (m *Snapshot) Unmarshal(dAtA []byte) error {
break
}
}
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ConfState", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRecord
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRecord
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthRecord
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.ConfState == nil {
m.ConfState = &raftpb.ConfState{}
}
if err := m.ConfState.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRecord(dAtA[iNdEx:])

View File

@ -2,6 +2,7 @@ syntax = "proto2";
package walpb;
import "gogoproto/gogo.proto";
import "raft/raftpb/raft.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
@ -14,7 +15,10 @@ message Record {
optional bytes data = 3;
}
// Keep in sync with raftpb.SnapshotMetadata.
message Snapshot {
optional uint64 index = 1 [(gogoproto.nullable) = false];
optional uint64 term = 2 [(gogoproto.nullable) = false];
// Field populated since >=etcd-3.5.0.
optional raftpb.ConfState conf_state = 3;
}

View File

@ -0,0 +1,36 @@
package walpb
import (
"testing"
"github.com/golang/protobuf/descriptor"
"go.etcd.io/etcd/raft/v3/raftpb"
)
func TestSnapshotMetadataCompatibility(t *testing.T) {
_, snapshotMetadataMd := descriptor.ForMessage(&raftpb.SnapshotMetadata{})
_, snapshotMd := descriptor.ForMessage(&Snapshot{})
if len(snapshotMetadataMd.GetField()) != len(snapshotMd.GetField()) {
t.Errorf("Different number of fields in raftpb.SnapshotMetadata vs. walpb.Snapshot. " +
"They are supposed to be in sync.")
}
}
func TestValidateSnapshot(t *testing.T) {
tests := []struct {
name string
snap *Snapshot
wantErr bool
}{
{name: "empty", snap: &Snapshot{}, wantErr: false},
{name: "invalid", snap: &Snapshot{Index: 5, Term: 3}, wantErr: true},
{name: "valid", snap: &Snapshot{Index: 5, Term: 3, ConfState: &raftpb.ConfState{Voters: []uint64{0x00cad1}}}, wantErr: false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := ValidateSnapshotForWrite(tt.snap); (err != nil) != tt.wantErr {
t.Errorf("ValidateSnapshotForWrite() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}