Merge pull request #16418 from geetasg/pr7

Update to generate v2 snapshot from v3 state
This commit is contained in:
Marek Siarkowicz 2023-08-22 10:24:53 +02:00 committed by GitHub
commit f3cc759afc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 15 additions and 31 deletions

View File

@ -2064,7 +2064,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
// TODO: non-blocking snapshot // TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
clone := s.v2store.Clone()
// commit kv to write metadata (for example: consistent index) to disk. // commit kv to write metadata (for example: consistent index) to disk.
// //
// This guarantees that Backend's consistent_index is >= index of last snapshot. // This guarantees that Backend's consistent_index is >= index of last snapshot.
@ -2075,16 +2074,12 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside // So KV().Commit() cannot run in parallel with toApply. It has to be called outside
// the go routine created below. // the go routine created below.
s.KV().Commit() s.KV().Commit()
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
s.GoAttach(func() { s.GoAttach(func() {
lg := s.Logger() lg := s.Logger()
d, err := clone.SaveNoCopy() // For backward compatibility, generate v2 snapshot from v3 state.
// TODO: current store will never fail to do a snapshot
// what should we do if the store might fail?
if err != nil {
lg.Panic("failed to save v2 store", zap.Error(err))
}
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d) snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
if err != nil { if err != nil {
// the snapshot was done asynchronously with the progress of raft. // the snapshot was done asynchronously with the progress of raft.

View File

@ -1047,7 +1047,10 @@ func TestSnapshot(t *testing.T) {
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
srv.be = be srv.be = be
ch := make(chan struct{}, 2) cl := membership.NewCluster(zaptest.NewLogger(t))
srv.cluster = cl
ch := make(chan struct{}, 1)
go func() { go func() {
gaction, _ := p.Wait(2) gaction, _ := p.Wait(2)
@ -1066,24 +1069,11 @@ func TestSnapshot(t *testing.T) {
} }
}() }()
go func() {
gaction, _ := st.Wait(2)
defer func() { ch <- struct{}{} }()
if len(gaction) != 2 {
t.Errorf("len(action) = %d, want 2", len(gaction))
}
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
t.Errorf("action = %s, want Clone", gaction[0])
}
if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
t.Errorf("action = %s, want SaveNoCopy", gaction[1])
}
}()
srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}}) srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}})
<-ch <-ch
<-ch if len(st.Action()) != 0 {
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
}
} }
// TestSnapshotOrdering ensures raft persists snapshot onto disk before // TestSnapshotOrdering ensures raft persists snapshot onto disk before
@ -1098,7 +1088,8 @@ func TestSnapshotOrdering(t *testing.T) {
n := newNopReadyNode() n := newNopReadyNode()
st := v2store.New() st := v2store.New()
cl := membership.NewCluster(lg) cl := membership.NewCluster(lg)
cl.SetStore(st) be, _ := betesting.NewDefaultTmpBackend(t)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
testdir := t.TempDir() testdir := t.TempDir()
@ -1118,7 +1109,6 @@ func TestSnapshotOrdering(t *testing.T) {
storage: p, storage: p,
raftStorage: rs, raftStorage: rs,
}) })
be, _ := betesting.NewDefaultTmpBackend(t)
ci := cindex.NewConsistentIndex(be) ci := cindex.NewConsistentIndex(be)
s := &EtcdServer{ s := &EtcdServer{
lgMu: new(sync.RWMutex), lgMu: new(sync.RWMutex),
@ -1211,6 +1201,9 @@ func TestTriggerSnap(t *testing.T) {
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
srv.be = be srv.be = be
cl := membership.NewCluster(zaptest.NewLogger(t))
srv.cluster = cl
srv.start() srv.start()
donec := make(chan struct{}) donec := make(chan struct{})

View File

@ -31,11 +31,7 @@ import (
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message { func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
lg := s.Logger() lg := s.Logger()
// get a snapshot of v2 store as []byte // get a snapshot of v2 store as []byte
clone := s.v2store.Clone() d := GetMembershipInfoInV2Format(lg, s.cluster)
d, err := clone.SaveNoCopy()
if err != nil {
lg.Panic("failed to save v2 store data", zap.Error(err))
}
// commit kv to write metadata(for example: consistent index). // commit kv to write metadata(for example: consistent index).
s.KV().Commit() s.KV().Commit()