*: do not block etcdserver when encoding store into json

Encoding store into json snapshot has quite high CPU cost. And it
will block for a while. This commit makes the encoding process non-
blocking by running it in another go-routine.
This commit is contained in:
Xiang Li 2015-02-26 12:39:50 -08:00 committed by Yicheng Qin
parent 9b4d52ee73
commit a4dab7ad75
3 changed files with 74 additions and 41 deletions

View File

@ -432,7 +432,7 @@ func (s *EtcdServer) run() {
if appliedi-snapi > s.r.snapCount {
log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi)
s.snapshot(appliedi, &confState)
s.snapshot(appliedi, confState)
snapi = appliedi
}
case <-syncC:
@ -814,36 +814,40 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
}
// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) {
d, err := s.store.Save()
// TODO: current store will never fail to do a snapshot
// what should we do if the store might fail?
if err != nil {
log.Panicf("etcdserver: store save should never fail: %v", err)
}
snap, err := s.r.raftStorage.CreateSnapshot(snapi, confState, d)
if err != nil {
// the snapshot was done asynchronously with the progress of raft.
// raft might have already got a newer snapshot.
if err == raft.ErrSnapOutOfDate {
return
}
log.Panicf("etcdserver: unexpected create snapshot error %v", err)
}
if err := s.r.storage.SaveSnap(snap); err != nil {
log.Fatalf("etcdserver: save snapshot error: %v", err)
}
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
clone := s.store.Clone()
err = s.r.raftStorage.Compact(snapi)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
if err == raft.ErrCompacted {
return
go func() {
d, err := clone.SaveNoCopy()
// TODO: current store will never fail to do a snapshot
// what should we do if the store might fail?
if err != nil {
log.Panicf("etcdserver: store save should never fail: %v", err)
}
log.Panicf("etcdserver: unexpected compaction error %v", err)
}
log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index)
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
if err != nil {
// the snapshot was done asynchronously with the progress of raft.
// raft might have already got a newer snapshot.
if err == raft.ErrSnapOutOfDate {
return
}
log.Panicf("etcdserver: unexpected create snapshot error %v", err)
}
if err := s.r.storage.SaveSnap(snap); err != nil {
log.Fatalf("etcdserver: save snapshot error: %v", err)
}
err = s.r.raftStorage.Compact(snapi)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
if err == raft.ErrCompacted {
return
}
log.Panicf("etcdserver: unexpected compaction error %v", err)
}
log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index)
}()
}
func (s *EtcdServer) PauseSending() { s.r.pauseSending() }

View File

@ -711,13 +711,17 @@ func TestSnapshot(t *testing.T) {
},
store: st,
}
srv.snapshot(1, &raftpb.ConfState{Nodes: []uint64{1}})
srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
testutil.ForceGosched()
gaction := st.Action()
if len(gaction) != 1 {
if len(gaction) != 2 {
t.Fatalf("len(action) = %d, want 1", len(gaction))
}
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Save"}) {
t.Errorf("action = %s, want Save", gaction[0])
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
t.Errorf("action = %s, want Clone", gaction[0])
}
if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
t.Errorf("action = %s, want SaveNoCopy", gaction[1])
}
gaction = p.Action()
if len(gaction) != 1 {
@ -1207,6 +1211,17 @@ func (s *storeRecorder) Recovery(b []byte) error {
s.Record(testutil.Action{Name: "Recovery"})
return nil
}
func (s *storeRecorder) SaveNoCopy() ([]byte, error) {
s.Record(testutil.Action{Name: "SaveNoCopy"})
return nil, nil
}
func (s *storeRecorder) Clone() store.Store {
s.Record(testutil.Action{Name: "Clone"})
return s
}
func (s *storeRecorder) JsonStats() []byte { return nil }
func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) {
s.Record(testutil.Action{

View File

@ -56,6 +56,9 @@ type Store interface {
Save() ([]byte, error)
Recovery(state []byte) error
Clone() Store
SaveNoCopy() ([]byte, error)
JsonStats() []byte
DeleteExpiredKeys(cutoff time.Time)
}
@ -621,6 +624,24 @@ func (s *store) checkDir(parent *node, dirName string) (*node, *etcdErr.Error) {
// It will not save the parent field of the node. Or there will
// be cyclic dependencies issue for the json package.
func (s *store) Save() ([]byte, error) {
b, err := json.Marshal(s.Clone())
if err != nil {
return nil, err
}
return b, nil
}
func (s *store) SaveNoCopy() ([]byte, error) {
b, err := json.Marshal(s)
if err != nil {
return nil, err
}
return b, nil
}
func (s *store) Clone() Store {
s.worldLock.Lock()
clonedStore := newStore()
@ -631,14 +652,7 @@ func (s *store) Save() ([]byte, error) {
clonedStore.CurrentVersion = s.CurrentVersion
s.worldLock.Unlock()
b, err := json.Marshal(clonedStore)
if err != nil {
return nil, err
}
return b, nil
return clonedStore
}
// Recovery recovers the store system from a static state