diff --git a/etcdserver/raft_storage.go b/etcdserver/raft_storage.go index bbc3010be..f3fc2250e 100644 --- a/etcdserver/raft_storage.go +++ b/etcdserver/raft_storage.go @@ -32,6 +32,20 @@ func newRaftStorage() *raftStorage { } } +func (rs *raftStorage) reqsnap() <-chan struct{} { + if rs.snapStore == nil { + return nil + } + return rs.snapStore.reqsnapc +} + +func (rs *raftStorage) raftsnap() chan<- raftpb.Snapshot { + if rs.snapStore == nil { + return nil + } + return rs.snapStore.raftsnapc +} + // Snapshot returns raft snapshot. If snapStore is nil, this method // returns snapshot saved in MemoryStorage. If snapStore exists, this method // returns snapshot from snapStore. diff --git a/etcdserver/server.go b/etcdserver/server.go index d4b4547c7..80f205efe 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -19,6 +19,7 @@ import ( "errors" "expvar" "fmt" + "log" "math/rand" "net/http" "os" @@ -506,6 +507,9 @@ func (s *EtcdServer) run() { s.snapshot(appliedi, confState) snapi = appliedi } + case <-s.r.raftStorage.reqsnap(): + s.r.raftStorage.raftsnap() <- s.createRaftSnapshot(appliedi, confState) + plog.Infof("requested snapshot created at %d", snapi) case err := <-s.errorc: plog.Errorf("%s", err) plog.Infof("the data-dir used by this member must be removed.") @@ -925,6 +929,29 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con return false, nil } +// createRaftSnapshot creates a raft snapshot that includes the state of store for v2 api. +func (s *EtcdServer) createRaftSnapshot(snapi uint64, confState raftpb.ConfState) raftpb.Snapshot { + snapt, err := s.r.raftStorage.Term(snapi) + if err != nil { + log.Panicf("get term should never fail: %v", err) + } + + clone := s.store.Clone() + d, err := clone.SaveNoCopy() + if err != nil { + plog.Panicf("store save should never fail: %v", err) + } + + return raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{ + Index: snapi, + Term: snapt, + ConfState: confState, + }, + Data: d, + } +} + // TODO: non-blocking snapshot func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { clone := s.store.Clone() diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 1c041b436..f385b1441 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -698,6 +698,48 @@ func TestSyncTrigger(t *testing.T) { } } +func TestCreateRaftSnapshot(t *testing.T) { + s := newRaftStorage() + s.Append([]raftpb.Entry{{Index: 1, Term: 1}}) + st := &storeRecorder{} + srv := &EtcdServer{ + r: raftNode{ + raftStorage: s, + }, + store: st, + } + + snap := srv.createRaftSnapshot(1, raftpb.ConfState{Nodes: []uint64{1}}) + wdata, err := st.Save() + if err != nil { + t.Fatal(err) + } + wsnap := raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{ + Index: 1, + Term: 1, + ConfState: raftpb.ConfState{Nodes: []uint64{1}}, + }, + Data: wdata, + } + if !reflect.DeepEqual(snap, wsnap) { + t.Errorf("snap = %+v, want %+v", snap, wsnap) + } + + gaction := st.Action() + // the third action is store.Save used in testing + if len(gaction) != 3 { + t.Fatalf("len(action) = %d, want 3", len(gaction)) + } + if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) { + t.Errorf("action = %s, want Clone", gaction[0]) + } + if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) { + t.Errorf("action = %s, want SaveNoCopy", gaction[1]) + } + +} + // snapshot should snapshot the store and cut the persistent func TestSnapshot(t *testing.T) { s := newRaftStorage() diff --git a/etcdserver/snapshot_store.go b/etcdserver/snapshot_store.go index 62eb5304f..ac8ae5364 100644 --- a/etcdserver/snapshot_store.go +++ b/etcdserver/snapshot_store.go @@ -47,19 +47,19 @@ type snapshotStore struct { // send empty to reqsnapc to notify the channel receiver to send back latest // snapshot to snapc reqsnapc chan struct{} - // a chan to receive the requested snapshot + // a chan to receive the requested raft snapshot // snapshotStore will receive from the chan immediately after it sends empty to reqsnapc - snapc chan raftpb.Snapshot + raftsnapc chan raftpb.Snapshot snap *snapshot } func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore { return &snapshotStore{ - dir: dir, - kv: kv, - reqsnapc: make(chan struct{}), - snapc: make(chan raftpb.Snapshot), + dir: dir, + kv: kv, + reqsnapc: make(chan struct{}), + raftsnapc: make(chan raftpb.Snapshot), } } @@ -73,7 +73,7 @@ func (ss *snapshotStore) getSnap() (*snapshot, error) { // ask to generate v2 snapshot ss.reqsnapc <- struct{}{} // TODO: generate v3 snapshot at here - raftsnap := <-ss.snapc + raftsnap := <-ss.raftsnapc ss.snap = &snapshot{ r: raftsnap, }