diff --git a/Dockerfile b/Dockerfile index fd624c5fa..4b3a9bf06 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,3 +8,4 @@ ADD . /opt/etcd RUN cd /opt/etcd && ./build EXPOSE 4001 7001 ENTRYPOINT ["/opt/etcd/etcd"] + diff --git a/etcd.go b/etcd.go index 0567c65df..5c3d7d3cb 100644 --- a/etcd.go +++ b/etcd.go @@ -82,7 +82,7 @@ func main() { registry := server.NewRegistry(store) // Create peer server. - ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount) + ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount, config.HeartbeatTimeout, config.ElectionTimeout) ps.MaxClusterSize = config.MaxClusterSize ps.RetryTimes = config.MaxRetryAttempts diff --git a/server/config.go b/server/config.go index 2e37b9fe4..214ce399d 100644 --- a/server/config.go +++ b/server/config.go @@ -67,7 +67,8 @@ type Config struct { ShowVersion bool Verbose bool `toml:"verbose" env:"ETCD_VERBOSE"` VeryVerbose bool `toml:"very_verbose" env:"ETCD_VERY_VERBOSE"` - + HeartbeatTimeout int `toml:"peer_heartbeat_timeout" env:"ETCD_PEER_HEARTBEAT_TIMEOUT"` + ElectionTimeout int `toml:"peer_election_timeout" env:"ETCD_PEER_ELECTION_TIMEOUT"` Peer struct { Addr string `toml:"addr" env:"ETCD_PEER_ADDR"` BindAddr string `toml:"bind_addr" env:"ETCD_PEER_BIND_ADDR"` @@ -87,6 +88,8 @@ func NewConfig() *Config { c.MaxRetryAttempts = 3 c.Peer.Addr = "127.0.0.1:7001" c.SnapshotCount = 10000 + c.HeartbeatTimeout = HeartbeatTimeout + c.ElectionTimeout = ElectionTimeout return c } @@ -233,6 +236,9 @@ func (c *Config) LoadFlags(arguments []string) error { f.IntVar(&c.MaxResultBuffer, "max-result-buffer", c.MaxResultBuffer, "") f.IntVar(&c.MaxRetryAttempts, "max-retry-attempts", c.MaxRetryAttempts, "") f.IntVar(&c.MaxClusterSize, "max-cluster-size", c.MaxClusterSize, "") + f.IntVar(&c.HeartbeatTimeout, "peer-heartbeat-timeout", c.HeartbeatTimeout, "") + f.IntVar(&c.ElectionTimeout, "peer-election-timeout", c.ElectionTimeout, "") + f.StringVar(&cors, "cors", "", "") f.BoolVar(&c.Snapshot, "snapshot", c.Snapshot, "") diff --git a/server/peer_server.go b/server/peer_server.go index 3f15c22c5..564aa3910 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -38,6 +38,8 @@ type PeerServer struct { snapConf *snapshotConf MaxClusterSize int RetryTimes int + heartbeatTimeout int + electionTimeout int } // TODO: find a good policy to do snapshot @@ -53,7 +55,7 @@ type snapshotConf struct { writesThr uint64 } -func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int) *PeerServer { +func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int, heartbeatTimeout int, electionTimeout int) *PeerServer { s := &PeerServer{ name: name, url: url, @@ -76,6 +78,8 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon back: -1, }, }, + heartbeatTimeout: heartbeatTimeout, + electionTimeout: electionTimeout, } // Create transporter for raft @@ -105,8 +109,8 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error { } } - s.raftServer.SetElectionTimeout(ElectionTimeout) - s.raftServer.SetHeartbeatTimeout(HeartbeatTimeout) + s.raftServer.SetElectionTimeout(time.Duration(s.electionTimeout) * time.Millisecond) + s.raftServer.SetHeartbeatTimeout(time.Duration(s.heartbeatTimeout) * time.Millisecond) s.raftServer.Start() diff --git a/server/timeout.go b/server/timeout.go index fa48c3162..892168b13 100644 --- a/server/timeout.go +++ b/server/timeout.go @@ -1,15 +1,11 @@ package server -import ( - "time" -) - const ( - // The amount of time to elapse without a heartbeat before becoming a candidate. - ElectionTimeout = 200 * time.Millisecond + // The amount of time (ms) to elapse without a heartbeat before becoming a candidate. + ElectionTimeout = 200 // The frequency by which heartbeats are sent to followers. - HeartbeatTimeout = 50 * time.Millisecond + HeartbeatTimeout = 50 RetryInterval = 10 ) diff --git a/server/transporter.go b/server/transporter.go index 9e58a26bd..7cf8f9f04 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -13,35 +13,30 @@ import ( "github.com/coreos/raft" ) -// Timeout for setup internal raft http connection -// This should not exceed 3 * RTT -var dailTimeout = 3 * HeartbeatTimeout - -// Timeout for setup internal raft http connection + receive all post body -// The raft server will not send back response header until it received all the -// post body. -// This should not exceed dailTimeout + electionTimeout -var responseHeaderTimeout = 3*HeartbeatTimeout + ElectionTimeout - -// Timeout for receiving the response body from the server -// This should not exceed heartbeatTimeout -var tranTimeout = HeartbeatTimeout - // Transporter layer for communication between raft nodes type transporter struct { client *http.Client transport *http.Transport peerServer *PeerServer + tranTimeout time.Duration } +type dialer func(network, addr string) (net.Conn, error) + // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter { + // names for each type of timeout, for the sake of clarity + dialTimeout := time.Duration(3 * peerServer.heartbeatTimeout + peerServer.electionTimeout) * time.Millisecond + responseHeaderTimeout := time.Duration(3 * peerServer.heartbeatTimeout + peerServer.electionTimeout) * time.Millisecond + t := transporter{} + t.tranTimeout = time.Duration(peerServer.heartbeatTimeout) * time.Millisecond + tr := &http.Transport{ - Dial: dialWithTimeout, + Dial: dialWithTimeoutFactory(dialTimeout), ResponseHeaderTimeout: responseHeaderTimeout, } @@ -57,9 +52,11 @@ func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) * return &t } -// Dial with timeout -func dialWithTimeout(network, addr string) (net.Conn, error) { - return net.DialTimeout(network, addr, dailTimeout) +// factory function to return a dialer +func dialWithTimeoutFactory( timeout time.Duration ) dialer { + return func(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, timeout) + } } // Sends AppendEntries RPCs to a peer when the server is the leader. @@ -238,7 +235,7 @@ func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) // Cancel the on fly HTTP transaction when timeout happens. func (t *transporter) CancelWhenTimeout(req *http.Request) { go func() { - time.Sleep(tranTimeout) + time.Sleep(t.tranTimeout) t.transport.CancelRequest(req) }() } diff --git a/server/usage.go b/server/usage.go index e8969c344..0dfd1b311 100644 --- a/server/usage.go +++ b/server/usage.go @@ -38,11 +38,15 @@ Client Communication Options: -key-file= Path to the client key file. Peer Communication Options: - -peer-addr= The public host:port used for peer communication. + -peer-addr= The public host:port used for peer communication. -peer-bind-addr= The listening host:port used for peer communication. - -peer-ca-file= Path to the peer CA file. - -peer-cert-file= Path to the peer cert file. - -peer-key-file= Path to the peer key file. + -peer-ca-file= Path to the peer CA file. + -peer-cert-file= Path to the peer cert file. + -peer-key-file= Path to the peer key file. + -peer-heartbeat-timeout=