diff --git a/etcdserver/server.go b/etcdserver/server.go index 8f4858400..3cefd27ff 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -432,7 +432,7 @@ func (s *EtcdServer) run() { if appliedi-snapi > s.r.snapCount { log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi) - s.snapshot(appliedi, &confState) + s.snapshot(appliedi, confState) snapi = appliedi } case <-syncC: @@ -814,36 +814,40 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con } // TODO: non-blocking snapshot -func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) { - d, err := s.store.Save() - // TODO: current store will never fail to do a snapshot - // what should we do if the store might fail? - if err != nil { - log.Panicf("etcdserver: store save should never fail: %v", err) - } - snap, err := s.r.raftStorage.CreateSnapshot(snapi, confState, d) - if err != nil { - // the snapshot was done asynchronously with the progress of raft. - // raft might have already got a newer snapshot. - if err == raft.ErrSnapOutOfDate { - return - } - log.Panicf("etcdserver: unexpected create snapshot error %v", err) - } - if err := s.r.storage.SaveSnap(snap); err != nil { - log.Fatalf("etcdserver: save snapshot error: %v", err) - } +func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { + clone := s.store.Clone() - err = s.r.raftStorage.Compact(snapi) - if err != nil { - // the compaction was done asynchronously with the progress of raft. - // raft log might already been compact. - if err == raft.ErrCompacted { - return + go func() { + d, err := clone.SaveNoCopy() + // TODO: current store will never fail to do a snapshot + // what should we do if the store might fail? + if err != nil { + log.Panicf("etcdserver: store save should never fail: %v", err) } - log.Panicf("etcdserver: unexpected compaction error %v", err) - } - log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index) + snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d) + if err != nil { + // the snapshot was done asynchronously with the progress of raft. + // raft might have already got a newer snapshot. + if err == raft.ErrSnapOutOfDate { + return + } + log.Panicf("etcdserver: unexpected create snapshot error %v", err) + } + if err := s.r.storage.SaveSnap(snap); err != nil { + log.Fatalf("etcdserver: save snapshot error: %v", err) + } + + err = s.r.raftStorage.Compact(snapi) + if err != nil { + // the compaction was done asynchronously with the progress of raft. + // raft log might already been compact. + if err == raft.ErrCompacted { + return + } + log.Panicf("etcdserver: unexpected compaction error %v", err) + } + log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index) + }() } func (s *EtcdServer) PauseSending() { s.r.pauseSending() } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 5e73206a8..063f05328 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -711,13 +711,17 @@ func TestSnapshot(t *testing.T) { }, store: st, } - srv.snapshot(1, &raftpb.ConfState{Nodes: []uint64{1}}) + srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}}) + testutil.ForceGosched() gaction := st.Action() - if len(gaction) != 1 { + if len(gaction) != 2 { t.Fatalf("len(action) = %d, want 1", len(gaction)) } - if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Save"}) { - t.Errorf("action = %s, want Save", gaction[0]) + if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) { + t.Errorf("action = %s, want Clone", gaction[0]) + } + if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) { + t.Errorf("action = %s, want SaveNoCopy", gaction[1]) } gaction = p.Action() if len(gaction) != 1 { @@ -1207,6 +1211,17 @@ func (s *storeRecorder) Recovery(b []byte) error { s.Record(testutil.Action{Name: "Recovery"}) return nil } + +func (s *storeRecorder) SaveNoCopy() ([]byte, error) { + s.Record(testutil.Action{Name: "SaveNoCopy"}) + return nil, nil +} + +func (s *storeRecorder) Clone() store.Store { + s.Record(testutil.Action{Name: "Clone"}) + return s +} + func (s *storeRecorder) JsonStats() []byte { return nil } func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) { s.Record(testutil.Action{ diff --git a/store/store.go b/store/store.go index 7024703c0..dea54690d 100644 --- a/store/store.go +++ b/store/store.go @@ -56,6 +56,9 @@ type Store interface { Save() ([]byte, error) Recovery(state []byte) error + Clone() Store + SaveNoCopy() ([]byte, error) + JsonStats() []byte DeleteExpiredKeys(cutoff time.Time) } @@ -621,6 +624,24 @@ func (s *store) checkDir(parent *node, dirName string) (*node, *etcdErr.Error) { // It will not save the parent field of the node. Or there will // be cyclic dependencies issue for the json package. func (s *store) Save() ([]byte, error) { + b, err := json.Marshal(s.Clone()) + if err != nil { + return nil, err + } + + return b, nil +} + +func (s *store) SaveNoCopy() ([]byte, error) { + b, err := json.Marshal(s) + if err != nil { + return nil, err + } + + return b, nil +} + +func (s *store) Clone() Store { s.worldLock.Lock() clonedStore := newStore() @@ -631,14 +652,7 @@ func (s *store) Save() ([]byte, error) { clonedStore.CurrentVersion = s.CurrentVersion s.worldLock.Unlock() - - b, err := json.Marshal(clonedStore) - - if err != nil { - return nil, err - } - - return b, nil + return clonedStore } // Recovery recovers the store system from a static state