fix(snapshot) count num of log entries rather than etcd transcations

This commit is contained in:
Xiang Li 2014-01-09 21:28:09 +08:00
parent 6b77b94127
commit f250649a5e

View File

@ -45,7 +45,7 @@ type PeerServer struct {
HeartbeatTimeout time.Duration HeartbeatTimeout time.Duration
ElectionTimeout time.Duration ElectionTimeout time.Duration
closeChan chan bool closeChan chan bool
timeoutThresholdChan chan interface{} timeoutThresholdChan chan interface{}
} }
@ -54,12 +54,12 @@ type snapshotConf struct {
// Etcd will check if snapshot is need every checkingInterval // Etcd will check if snapshot is need every checkingInterval
checkingInterval time.Duration checkingInterval time.Duration
// The number of writes when the last snapshot happened // The index when the last snapshot happened
lastWrites uint64 lastIndex uint64
// If the incremental number of writes since the last snapshot // If the incremental number of index since the last snapshot
// exceeds the write Threshold, etcd will do a snapshot // exceeds the snapshot Threshold, etcd will do a snapshot
writesThr uint64 snapshotThr uint64
} }
func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int) *PeerServer { func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int) *PeerServer {
@ -71,7 +71,6 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
tlsInfo: tlsInfo, tlsInfo: tlsInfo,
registry: registry, registry: registry,
store: store, store: store,
snapConf: &snapshotConf{time.Second * 3, 0, uint64(snapshotCount)},
followersStats: &raftFollowersStats{ followersStats: &raftFollowersStats{
Leader: name, Leader: name,
Followers: make(map[string]*raftFollowerStats), Followers: make(map[string]*raftFollowerStats),
@ -101,6 +100,13 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
log.Fatal(err) log.Fatal(err)
} }
s.snapConf = &snapshotConf{
checkingInterval: time.Second * 3,
// this is not accurate, we will update raft to provide an api
lastIndex: raftServer.CommitIndex(),
snapshotThr: uint64(snapshotCount),
}
s.raftServer = raftServer s.raftServer = raftServer
s.raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger) s.raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger)
s.raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger) s.raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger)
@ -483,17 +489,19 @@ func (s *PeerServer) raftEventLogger(event raft.Event) {
case s.timeoutThresholdChan <- value: case s.timeoutThresholdChan <- value:
default: default:
} }
} }
} }
func (s *PeerServer) monitorSnapshot() { func (s *PeerServer) monitorSnapshot() {
for { for {
time.Sleep(s.snapConf.checkingInterval) time.Sleep(s.snapConf.checkingInterval)
currentWrites := s.store.TotalTransactions() - s.snapConf.lastWrites currentIndex := s.RaftServer().CommitIndex()
if uint64(currentWrites) > s.snapConf.writesThr {
count := currentIndex - s.snapConf.lastIndex
if uint64(count) > s.snapConf.snapshotThr {
s.raftServer.TakeSnapshot() s.raftServer.TakeSnapshot()
s.snapConf.lastWrites = s.store.TotalTransactions() s.snapConf.lastIndex = currentIndex
} }
} }
} }