From ded3cc24c0fb47d60695a2d2ffe731664576a074 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 2 Dec 2013 22:53:36 -0500 Subject: [PATCH 01/10] fix redirect url should include rawquery --- server/v2/get_handler.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/server/v2/get_handler.go b/server/v2/get_handler.go index 2f48fc32a..9a67ea2ae 100644 --- a/server/v2/get_handler.go +++ b/server/v2/get_handler.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" "strconv" etcdErr "github.com/coreos/etcd/error" @@ -24,9 +25,17 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error { if req.FormValue("consistent") == "true" && s.State() != raft.Leader { leader := s.Leader() hostname, _ := s.ClientURL(leader) - url := hostname + req.URL.Path - log.Debugf("Redirect consistent get to %s", url) - http.Redirect(w, req, url, http.StatusTemporaryRedirect) + + url, err := url.Parse(hostname) + if err != nil { + log.Warn("Redirect cannot parse hostName ", hostname) + return err + } + url.RawQuery = req.URL.RawQuery + url.Path = req.URL.Path + + log.Debugf("Redirect consistent get to %s", url.String()) + http.Redirect(w, req, url.String(), http.StatusTemporaryRedirect) return nil } From 46f8a354d1c37319d932d2eb87d763212149e828 Mon Sep 17 00:00:00 2001 From: Neil Dunbar Date: Wed, 4 Dec 2013 16:58:44 +0000 Subject: [PATCH 02/10] Added the ability to specify heartbeat and election timeouts as config parameters. --- Dockerfile | 2 +- etcd.go | 2 +- server/config.go | 6 +++++- server/peer_server.go | 10 +++++++--- server/server.go | 1 + server/timeout.go | 12 +++++++----- server/transporter.go | 33 +++++++++++++-------------------- server/usage.go | 4 ++++ 8 files changed, 39 insertions(+), 31 deletions(-) diff --git a/Dockerfile b/Dockerfile index e025979db..afcf0afcc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,4 +7,4 @@ RUN apt-get install -y golang ADD . /opt/etcd RUN cd /opt/etcd && ./build EXPOSE 4001 7001 -ENTRYPOINT ["/opt/etcd/etcd", "-addr", "0.0.0.0:4001", "-bind-addr", "0.0.0.0:7001"] +ENTRYPOINT ["/datastore/start.sh"] diff --git a/etcd.go b/etcd.go index b72880d0f..090bda6a1 100644 --- a/etcd.go +++ b/etcd.go @@ -92,7 +92,7 @@ func main() { registry := server.NewRegistry(store) // Create peer server. - ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount) + ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount, config.HeartbeatTimeout, config.ElectionTimeout) ps.MaxClusterSize = config.MaxClusterSize ps.RetryTimes = config.MaxRetryAttempts diff --git a/server/config.go b/server/config.go index 939a2580d..4ec26b876 100644 --- a/server/config.go +++ b/server/config.go @@ -67,7 +67,8 @@ type Config struct { ShowVersion bool Verbose bool `toml:"verbose" env:"ETCD_VERBOSE"` VeryVerbose bool `toml:"very_verbose" env:"ETCD_VERY_VERBOSE"` - + HeartbeatTimeout int + ElectionTimeout int Peer struct { Addr string `toml:"addr" env:"ETCD_PEER_ADDR"` BindAddr string `toml:"bind_addr" env:"ETCD_PEER_BIND_ADDR"` @@ -228,6 +229,9 @@ func (c *Config) LoadFlags(arguments []string) error { f.IntVar(&c.MaxResultBuffer, "max-result-buffer", c.MaxResultBuffer, "") f.IntVar(&c.MaxRetryAttempts, "max-retry-attempts", c.MaxRetryAttempts, "") f.IntVar(&c.MaxClusterSize, "max-cluster-size", c.MaxClusterSize, "") + f.IntVar(&c.HeartbeatTimeout, "peer-heartbeat-timeout", HeartbeatTimeout, "") + f.IntVar(&c.ElectionTimeout, "peer-election-timeout", ElectionTimeout, "") + f.StringVar(&cors, "cors", "", "") f.BoolVar(&c.Snapshot, "snapshot", c.Snapshot, "") diff --git a/server/peer_server.go b/server/peer_server.go index 3f15c22c5..564aa3910 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -38,6 +38,8 @@ type PeerServer struct { snapConf *snapshotConf MaxClusterSize int RetryTimes int + heartbeatTimeout int + electionTimeout int } // TODO: find a good policy to do snapshot @@ -53,7 +55,7 @@ type snapshotConf struct { writesThr uint64 } -func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int) *PeerServer { +func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int, heartbeatTimeout int, electionTimeout int) *PeerServer { s := &PeerServer{ name: name, url: url, @@ -76,6 +78,8 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon back: -1, }, }, + heartbeatTimeout: heartbeatTimeout, + electionTimeout: electionTimeout, } // Create transporter for raft @@ -105,8 +109,8 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error { } } - s.raftServer.SetElectionTimeout(ElectionTimeout) - s.raftServer.SetHeartbeatTimeout(HeartbeatTimeout) + s.raftServer.SetElectionTimeout(time.Duration(s.electionTimeout) * time.Millisecond) + s.raftServer.SetHeartbeatTimeout(time.Duration(s.heartbeatTimeout) * time.Millisecond) s.raftServer.Start() diff --git a/server/server.go b/server/server.go index 00c39227a..58b0f642d 100644 --- a/server/server.go +++ b/server/server.go @@ -227,6 +227,7 @@ func (s *Server) listenAndServeTLS(certFile, keyFile string) error { tlsListener := tls.NewListener(conn, config) s.listener = tlsListener + log.Debugf("etcd listening using tls key %s, cert %s", keyFile, certFile) return s.Server.Serve(tlsListener) } diff --git a/server/timeout.go b/server/timeout.go index fa48c3162..2838354b0 100644 --- a/server/timeout.go +++ b/server/timeout.go @@ -1,15 +1,17 @@ package server -import ( - "time" -) +// import ( +// "time" +// ) const ( // The amount of time to elapse without a heartbeat before becoming a candidate. - ElectionTimeout = 200 * time.Millisecond + // ElectionTimeout = 200 * time.Millisecond + ElectionTimeout = 200 // The frequency by which heartbeats are sent to followers. - HeartbeatTimeout = 50 * time.Millisecond + //HeartbeatTimeout = 50 * time.Millisecond + HeartbeatTimeout = 50 RetryInterval = 10 ) diff --git a/server/transporter.go b/server/transporter.go index 9e58a26bd..6616e08ed 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -13,36 +13,27 @@ import ( "github.com/coreos/raft" ) -// Timeout for setup internal raft http connection -// This should not exceed 3 * RTT -var dailTimeout = 3 * HeartbeatTimeout - -// Timeout for setup internal raft http connection + receive all post body -// The raft server will not send back response header until it received all the -// post body. -// This should not exceed dailTimeout + electionTimeout -var responseHeaderTimeout = 3*HeartbeatTimeout + ElectionTimeout - -// Timeout for receiving the response body from the server -// This should not exceed heartbeatTimeout -var tranTimeout = HeartbeatTimeout - // Transporter layer for communication between raft nodes type transporter struct { client *http.Client transport *http.Transport peerServer *PeerServer + tranTimeout time.Duration } +type dialer func(network, addr string) (net.Conn, error) + // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter { t := transporter{} + t.tranTimeout = time.Duration(peerServer.heartbeatTimeout) * time.Millisecond + tr := &http.Transport{ - Dial: dialWithTimeout, - ResponseHeaderTimeout: responseHeaderTimeout, + Dial: dialWithTimeoutFactory( time.Duration(3 * peerServer.heartbeatTimeout + peerServer.electionTimeout) * time.Millisecond), + ResponseHeaderTimeout: time.Duration(3 * peerServer.heartbeatTimeout + peerServer.electionTimeout) * time.Millisecond, } if scheme == "https" { @@ -57,9 +48,11 @@ func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) * return &t } -// Dial with timeout -func dialWithTimeout(network, addr string) (net.Conn, error) { - return net.DialTimeout(network, addr, dailTimeout) +// factory function to return a dialer +func dialWithTimeoutFactory( timeout time.Duration ) dialer { + return func(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, timeout) + } } // Sends AppendEntries RPCs to a peer when the server is the leader. @@ -238,7 +231,7 @@ func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) // Cancel the on fly HTTP transaction when timeout happens. func (t *transporter) CancelWhenTimeout(req *http.Request) { go func() { - time.Sleep(tranTimeout) + time.Sleep(t.tranTimeout) t.transport.CancelRequest(req) }() } diff --git a/server/usage.go b/server/usage.go index 3809fb04b..b0606ab66 100644 --- a/server/usage.go +++ b/server/usage.go @@ -43,6 +43,10 @@ Peer Communication Options: -peer-ca-file= Path to the peer CA file. -peer-cert-file= Path to the peer cert file. -peer-key-file= Path to the peer key file. + -peer-heartbeat-timeout=