From 60302613632fce8200a01bda0bfde11e150f69ef Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 30 Jul 2014 17:21:27 -0700 Subject: [PATCH] etcd/raft: add snap --- etcd/etcd_test.go | 18 ++++++++++++++++++ etcd/participant.go | 9 +++++++++ raft/log.go | 5 +++++ raft/node.go | 12 ++++++++++++ raft/raft.go | 5 +++++ raft/raft_test.go | 45 --------------------------------------------- 6 files changed, 49 insertions(+), 45 deletions(-) diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index 9089413eb..71864b35b 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -368,6 +368,24 @@ func TestSingleNodeRecovery(t *testing.T) { destroyServer(t, e, h) } +func TestTakingSnapshot(t *testing.T) { + es, hs := buildCluster(1, false) + for i := 0; i < defaultCompact; i++ { + es[0].p.Set("/foo", false, "bar", store.Permanent) + } + snap := es[0].p.node.GetSnap() + if snap.Index != defaultCompact { + t.Errorf("snap.Index = %d, want %d", snap.Index, defaultCompact) + } + + for i := range hs { + es[len(hs)-i-1].Stop() + } + for i := range hs { + hs[len(hs)-i-1].Close() + } +} + func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) { bootstrapper := 0 es := make([]*Server, number) diff --git a/etcd/participant.go b/etcd/participant.go index bdf62a523..ce9c933f2 100644 --- a/etcd/participant.go +++ b/etcd/participant.go @@ -36,6 +36,7 @@ import ( const ( defaultHeartbeat = 1 defaultElection = 5 + defaultCompact = 10000 maxBufferedProposal = 128 @@ -213,6 +214,14 @@ func (p *participant) run() int64 { log.Printf("id=%x participant.end\n", p.id) return standbyMode } + if p.node.EntsLen() > defaultCompact { + d, err := p.Save() + if err != nil { + panic(err) + } + p.node.Compact(d) + log.Printf("id=%x compacted index=\n", p.id) + } } } diff --git a/raft/log.go b/raft/log.go index 58967b621..e6bd2eb3e 100644 --- a/raft/log.go +++ b/raft/log.go @@ -24,6 +24,7 @@ type raftLog struct { committed int64 applied int64 offset int64 + snapshot Snapshot // want a compact after the number of entries exceeds the threshold // TODO(xiangli) size might be a better criteria @@ -154,6 +155,10 @@ func (l *raftLog) compact(i int64) int64 { return int64(len(l.ents)) } +func (l *raftLog) snap(d []byte, index, term int64, nodes []int64) { + l.snapshot = Snapshot{d, nodes, index, term} +} + func (l *raftLog) shouldCompact() bool { return (l.applied - l.offset) > l.compactThreshold } diff --git a/raft/node.go b/raft/node.go index 361cfa154..cee1bbfbc 100644 --- a/raft/node.go +++ b/raft/node.go @@ -231,3 +231,15 @@ func (n *Node) UnstableState() State { n.sm.clearState() return s } + +func (n *Node) GetSnap() Snapshot { + return n.sm.raftLog.snapshot +} + +func (n *Node) Compact(d []byte) { + n.sm.compact(d) +} + +func (n *Node) EntsLen() int { + return len(n.sm.raftLog.ents) +} diff --git a/raft/raft.go b/raft/raft.go index dbb8d27ce..a84caadc0 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -513,6 +513,11 @@ func (sm *stateMachine) maybeCompact() bool { return true } +func (sm *stateMachine) compact(d []byte) { + sm.raftLog.snap(d, sm.raftLog.applied, sm.raftLog.term(sm.raftLog.applied), sm.nodes()) + sm.raftLog.compact(sm.raftLog.applied) +} + // restore recovers the statemachine from a snapshot. It restores the log and the // configuration of statemachine. It calls the snapshoter to restore from the given // snapshot. diff --git a/raft/raft_test.go b/raft/raft_test.go index f11c3ccaa..795cc0a05 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -774,51 +774,6 @@ func TestRecvMsgBeat(t *testing.T) { } } -func TestMaybeCompact(t *testing.T) { - tests := []struct { - snapshoter Snapshoter - applied int64 - wCompact bool - }{ - {nil, defaultCompactThreshold + 1, false}, - {new(logSnapshoter), defaultCompactThreshold - 1, false}, - {new(logSnapshoter), defaultCompactThreshold + 1, true}, - } - - for i, tt := range tests { - sm := newStateMachine(0, []int64{0, 1, 2}) - sm.setSnapshoter(tt.snapshoter) - for i := 0; i < defaultCompactThreshold*2; i++ { - sm.raftLog.append(int64(i), Entry{Term: int64(i + 1)}) - } - sm.raftLog.applied = tt.applied - sm.raftLog.committed = tt.applied - - if g := sm.maybeCompact(); g != tt.wCompact { - t.Errorf("#%d: compact = %v, want %v", i, g, tt.wCompact) - } - - if tt.wCompact { - s := sm.snapshoter.GetSnap() - if s.Index != tt.applied { - t.Errorf("#%d: snap.Index = %v, want %v", i, s.Index, tt.applied) - } - if s.Term != tt.applied { - t.Errorf("#%d: snap.Term = %v, want %v", i, s.Index, tt.applied) - } - - w := sm.nodes() - sw := int64Slice(w) - sg := int64Slice(s.Nodes) - sort.Sort(sw) - sort.Sort(sg) - if !reflect.DeepEqual(sg, sw) { - t.Errorf("#%d: snap.Nodes = %+v, want %+v", i, sg, sw) - } - } - } -} - func TestRestore(t *testing.T) { s := Snapshot{ Index: defaultCompactThreshold + 1,