mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: support to create raft snapshot at apply loop
and snapStore could trigger it to create the latest raft snapshot.
This commit is contained in:
parent
ccce61bda9
commit
bfe9502f4f
@ -32,6 +32,20 @@ func newRaftStorage() *raftStorage {
|
||||
}
|
||||
}
|
||||
|
||||
func (rs *raftStorage) reqsnap() <-chan struct{} {
|
||||
if rs.snapStore == nil {
|
||||
return nil
|
||||
}
|
||||
return rs.snapStore.reqsnapc
|
||||
}
|
||||
|
||||
func (rs *raftStorage) raftsnap() chan<- raftpb.Snapshot {
|
||||
if rs.snapStore == nil {
|
||||
return nil
|
||||
}
|
||||
return rs.snapStore.raftsnapc
|
||||
}
|
||||
|
||||
// Snapshot returns raft snapshot. If snapStore is nil, this method
|
||||
// returns snapshot saved in MemoryStorage. If snapStore exists, this method
|
||||
// returns snapshot from snapStore.
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"os"
|
||||
@ -509,6 +510,9 @@ func (s *EtcdServer) run() {
|
||||
s.snapshot(appliedi, confState)
|
||||
snapi = appliedi
|
||||
}
|
||||
case <-s.r.raftStorage.reqsnap():
|
||||
s.r.raftStorage.raftsnap() <- s.createRaftSnapshot(appliedi, confState)
|
||||
plog.Infof("requested snapshot created at %d", snapi)
|
||||
case err := <-s.errorc:
|
||||
plog.Errorf("%s", err)
|
||||
plog.Infof("the data-dir used by this member must be removed.")
|
||||
@ -928,6 +932,29 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// createRaftSnapshot creates a raft snapshot that includes the state of store for v2 api.
|
||||
func (s *EtcdServer) createRaftSnapshot(snapi uint64, confState raftpb.ConfState) raftpb.Snapshot {
|
||||
snapt, err := s.r.raftStorage.Term(snapi)
|
||||
if err != nil {
|
||||
log.Panicf("get term should never fail: %v", err)
|
||||
}
|
||||
|
||||
clone := s.store.Clone()
|
||||
d, err := clone.SaveNoCopy()
|
||||
if err != nil {
|
||||
plog.Panicf("store save should never fail: %v", err)
|
||||
}
|
||||
|
||||
return raftpb.Snapshot{
|
||||
Metadata: raftpb.SnapshotMetadata{
|
||||
Index: snapi,
|
||||
Term: snapt,
|
||||
ConfState: confState,
|
||||
},
|
||||
Data: d,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: non-blocking snapshot
|
||||
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
||||
clone := s.store.Clone()
|
||||
|
@ -698,6 +698,48 @@ func TestSyncTrigger(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateRaftSnapshot(t *testing.T) {
|
||||
s := newRaftStorage()
|
||||
s.Append([]raftpb.Entry{{Index: 1, Term: 1}})
|
||||
st := &storeRecorder{}
|
||||
srv := &EtcdServer{
|
||||
r: raftNode{
|
||||
raftStorage: s,
|
||||
},
|
||||
store: st,
|
||||
}
|
||||
|
||||
snap := srv.createRaftSnapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
|
||||
wdata, err := st.Save()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
wsnap := raftpb.Snapshot{
|
||||
Metadata: raftpb.SnapshotMetadata{
|
||||
Index: 1,
|
||||
Term: 1,
|
||||
ConfState: raftpb.ConfState{Nodes: []uint64{1}},
|
||||
},
|
||||
Data: wdata,
|
||||
}
|
||||
if !reflect.DeepEqual(snap, wsnap) {
|
||||
t.Errorf("snap = %+v, want %+v", snap, wsnap)
|
||||
}
|
||||
|
||||
gaction := st.Action()
|
||||
// the third action is store.Save used in testing
|
||||
if len(gaction) != 3 {
|
||||
t.Fatalf("len(action) = %d, want 3", len(gaction))
|
||||
}
|
||||
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
|
||||
t.Errorf("action = %s, want Clone", gaction[0])
|
||||
}
|
||||
if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
|
||||
t.Errorf("action = %s, want SaveNoCopy", gaction[1])
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// snapshot should snapshot the store and cut the persistent
|
||||
func TestSnapshot(t *testing.T) {
|
||||
s := newRaftStorage()
|
||||
|
@ -47,19 +47,19 @@ type snapshotStore struct {
|
||||
// send empty to reqsnapc to notify the channel receiver to send back latest
|
||||
// snapshot to snapc
|
||||
reqsnapc chan struct{}
|
||||
// a chan to receive the requested snapshot
|
||||
// a chan to receive the requested raft snapshot
|
||||
// snapshotStore will receive from the chan immediately after it sends empty to reqsnapc
|
||||
snapc chan raftpb.Snapshot
|
||||
raftsnapc chan raftpb.Snapshot
|
||||
|
||||
snap *snapshot
|
||||
}
|
||||
|
||||
func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore {
|
||||
return &snapshotStore{
|
||||
dir: dir,
|
||||
kv: kv,
|
||||
reqsnapc: make(chan struct{}),
|
||||
snapc: make(chan raftpb.Snapshot),
|
||||
dir: dir,
|
||||
kv: kv,
|
||||
reqsnapc: make(chan struct{}),
|
||||
raftsnapc: make(chan raftpb.Snapshot),
|
||||
}
|
||||
}
|
||||
|
||||
@ -73,7 +73,7 @@ func (ss *snapshotStore) getSnap() (*snapshot, error) {
|
||||
// ask to generate v2 snapshot
|
||||
ss.reqsnapc <- struct{}{}
|
||||
// TODO: generate v3 snapshot at here
|
||||
raftsnap := <-ss.snapc
|
||||
raftsnap := <-ss.raftsnapc
|
||||
ss.snap = &snapshot{
|
||||
r: raftsnap,
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user