From f250649a5e2b4781e490bdfc922627699bfcfb1d Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 9 Jan 2014 21:28:09 +0800 Subject: [PATCH] fix(snapshot) count num of log entries rather than etcd transcations --- server/peer_server.go | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/server/peer_server.go b/server/peer_server.go index dc2de40e6..87e450eb1 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -45,7 +45,7 @@ type PeerServer struct { HeartbeatTimeout time.Duration ElectionTimeout time.Duration - closeChan chan bool + closeChan chan bool timeoutThresholdChan chan interface{} } @@ -54,12 +54,12 @@ type snapshotConf struct { // Etcd will check if snapshot is need every checkingInterval checkingInterval time.Duration - // The number of writes when the last snapshot happened - lastWrites uint64 + // The index when the last snapshot happened + lastIndex uint64 - // If the incremental number of writes since the last snapshot - // exceeds the write Threshold, etcd will do a snapshot - writesThr uint64 + // If the incremental number of index since the last snapshot + // exceeds the snapshot Threshold, etcd will do a snapshot + snapshotThr uint64 } func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int) *PeerServer { @@ -71,7 +71,6 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon tlsInfo: tlsInfo, registry: registry, store: store, - snapConf: &snapshotConf{time.Second * 3, 0, uint64(snapshotCount)}, followersStats: &raftFollowersStats{ Leader: name, Followers: make(map[string]*raftFollowerStats), @@ -101,6 +100,13 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon log.Fatal(err) } + s.snapConf = &snapshotConf{ + checkingInterval: time.Second * 3, + // this is not accurate, we will update raft to provide an api + lastIndex: raftServer.CommitIndex(), + snapshotThr: uint64(snapshotCount), + } + s.raftServer = raftServer s.raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger) s.raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger) @@ -483,17 +489,19 @@ func (s *PeerServer) raftEventLogger(event raft.Event) { case s.timeoutThresholdChan <- value: default: } - + } } func (s *PeerServer) monitorSnapshot() { for { time.Sleep(s.snapConf.checkingInterval) - currentWrites := s.store.TotalTransactions() - s.snapConf.lastWrites - if uint64(currentWrites) > s.snapConf.writesThr { + currentIndex := s.RaftServer().CommitIndex() + + count := currentIndex - s.snapConf.lastIndex + if uint64(count) > s.snapConf.snapshotThr { s.raftServer.TakeSnapshot() - s.snapConf.lastWrites = s.store.TotalTransactions() + s.snapConf.lastIndex = currentIndex } } }