Merge pull request #473 from bcwaldon/fix-peer-timeouts

Use election and heartbeat timeouts when building peer transporter
This commit is contained in:
Xiang Li
2014-01-15 02:11:40 -08:00
5 changed files with 22 additions and 32 deletions

10
etcd.go
View File

@@ -86,15 +86,11 @@ func main() {
registry := server.NewRegistry(store) registry := server.NewRegistry(store)
// Create peer server. // Create peer server.
ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount) heartbeatTimeout := time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond
electionTimeout := time.Duration(config.Peer.ElectionTimeout) * time.Millisecond
ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount, heartbeatTimeout, electionTimeout)
ps.MaxClusterSize = config.MaxClusterSize ps.MaxClusterSize = config.MaxClusterSize
ps.RetryTimes = config.MaxRetryAttempts ps.RetryTimes = config.MaxRetryAttempts
if config.Peer.HeartbeatTimeout > 0 {
ps.HeartbeatTimeout = time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond
}
if config.Peer.ElectionTimeout > 0 {
ps.ElectionTimeout = time.Duration(config.Peer.ElectionTimeout) * time.Millisecond
}
// Create client server. // Create client server.
s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &tlsConfig, &info.EtcdTLS, ps, registry, store) s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &tlsConfig, &info.EtcdTLS, ps, registry, store)

View File

@@ -89,8 +89,8 @@ func NewConfig() *Config {
c.MaxRetryAttempts = 3 c.MaxRetryAttempts = 3
c.SnapshotCount = 10000 c.SnapshotCount = 10000
c.Peer.Addr = "127.0.0.1:7001" c.Peer.Addr = "127.0.0.1:7001"
c.Peer.HeartbeatTimeout = 0 c.Peer.HeartbeatTimeout = defaultHeartbeatTimeout
c.Peer.ElectionTimeout = 0 c.Peer.ElectionTimeout = defaultElectionTimeout
return c return c
} }

View File

@@ -62,7 +62,7 @@ type snapshotConf struct {
snapshotThr uint64 snapshotThr uint64
} }
func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int) *PeerServer { func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int, heartbeatTimeout, electionTimeout time.Duration) *PeerServer {
s := &PeerServer{ s := &PeerServer{
name: name, name: name,
url: url, url: url,
@@ -85,8 +85,8 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
back: -1, back: -1,
}, },
}, },
HeartbeatTimeout: defaultHeartbeatTimeout, HeartbeatTimeout: heartbeatTimeout,
ElectionTimeout: defaultElectionTimeout, ElectionTimeout: electionTimeout,
timeoutThresholdChan: make(chan interface{}, 1), timeoutThresholdChan: make(chan interface{}, 1),
} }

View File

@@ -1,13 +1,9 @@
package server package server
import (
"time"
)
const ( const (
// The amount of time to elapse without a heartbeat before becoming a candidate // The amount of time (in ms) to elapse without a heartbeat before becoming a candidate
defaultElectionTimeout = 200 * time.Millisecond defaultElectionTimeout = 200
// The frequency by which heartbeats are sent to followers. // The frequency (in ms) by which heartbeats are sent to followers.
defaultHeartbeatTimeout = 50 * time.Millisecond defaultHeartbeatTimeout = 50
) )

View File

@@ -14,8 +14,8 @@ const (
testClientURL = "localhost:4401" testClientURL = "localhost:4401"
testRaftURL = "localhost:7701" testRaftURL = "localhost:7701"
testSnapshotCount = 10000 testSnapshotCount = 10000
testHeartbeatTimeout = 50 testHeartbeatTimeout = time.Duration(50) * time.Millisecond
testElectionTimeout = 200 testElectionTimeout = time.Duration(200) * time.Millisecond
) )
// Starts a server in a temporary directory. // Starts a server in a temporary directory.
@@ -26,10 +26,8 @@ func RunServer(f func(*server.Server)) {
store := store.New() store := store.New()
registry := server.NewRegistry(store) registry := server.NewRegistry(store)
ps := server.NewPeerServer(testName, path, "http://"+testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount) ps := server.NewPeerServer(testName, path, "http://"+testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount, testHeartbeatTimeout, testElectionTimeout)
ps.MaxClusterSize = 9 ps.MaxClusterSize = 9
ps.ElectionTimeout = testElectionTimeout
ps.HeartbeatTimeout = testHeartbeatTimeout
s := server.New(testName, "http://"+testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store) s := server.New(testName, "http://"+testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store)
ps.SetServer(s) ps.SetServer(s)