Merge pull request #462 from xiangli-cmu/sync_cnt_for_snapshot

fix(snapshot) count num of log entries rather than etcd transcations
This commit is contained in:
Xiang Li 2014-01-09 05:29:18 -08:00
commit 7bfd11679b

View File

@ -45,7 +45,7 @@ type PeerServer struct {
HeartbeatTimeout time.Duration
ElectionTimeout time.Duration
closeChan chan bool
closeChan chan bool
timeoutThresholdChan chan interface{}
}
@ -54,12 +54,12 @@ type snapshotConf struct {
// Etcd will check if snapshot is need every checkingInterval
checkingInterval time.Duration
// The number of writes when the last snapshot happened
lastWrites uint64
// The index when the last snapshot happened
lastIndex uint64
// If the incremental number of writes since the last snapshot
// exceeds the write Threshold, etcd will do a snapshot
writesThr uint64
// If the incremental number of index since the last snapshot
// exceeds the snapshot Threshold, etcd will do a snapshot
snapshotThr uint64
}
func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int) *PeerServer {
@ -71,7 +71,6 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
tlsInfo: tlsInfo,
registry: registry,
store: store,
snapConf: &snapshotConf{time.Second * 3, 0, uint64(snapshotCount)},
followersStats: &raftFollowersStats{
Leader: name,
Followers: make(map[string]*raftFollowerStats),
@ -101,6 +100,13 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
log.Fatal(err)
}
s.snapConf = &snapshotConf{
checkingInterval: time.Second * 3,
// this is not accurate, we will update raft to provide an api
lastIndex: raftServer.CommitIndex(),
snapshotThr: uint64(snapshotCount),
}
s.raftServer = raftServer
s.raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger)
s.raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger)
@ -483,17 +489,19 @@ func (s *PeerServer) raftEventLogger(event raft.Event) {
case s.timeoutThresholdChan <- value:
default:
}
}
}
func (s *PeerServer) monitorSnapshot() {
for {
time.Sleep(s.snapConf.checkingInterval)
currentWrites := s.store.TotalTransactions() - s.snapConf.lastWrites
if uint64(currentWrites) > s.snapConf.writesThr {
currentIndex := s.RaftServer().CommitIndex()
count := currentIndex - s.snapConf.lastIndex
if uint64(count) > s.snapConf.snapshotThr {
s.raftServer.TakeSnapshot()
s.snapConf.lastWrites = s.store.TotalTransactions()
s.snapConf.lastIndex = currentIndex
}
}
}