From 5a91937367840dd4b376ba176593cfad7497759f Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 11 Aug 2015 21:09:03 -0700 Subject: [PATCH 1/2] etcdserver: adjust commit timeout based on config It uses heartbeat interval and election timeout to estimate the commit timeout for internal requests. This PR helps etcd survive under high roundtrip-time environment, e.g., globally-deployed cluster. --- etcdserver/config.go | 9 +++++++++ etcdserver/raft.go | 2 +- etcdserver/server.go | 10 +++------- etcdserver/server_test.go | 10 ++++++++++ 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/etcdserver/config.go b/etcdserver/config.go index b19eae375..ba62c7087 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -20,6 +20,7 @@ import ( "path" "reflect" "sort" + "time" "github.com/coreos/etcd/pkg/types" ) @@ -110,6 +111,14 @@ func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap" func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" } +// CommitTimeout returns commit timeout under normal case. +func (c *ServerConfig) CommitTimeout() time.Duration { + // We assume that heartbeat >= TTL. + // 5s for queue waiting, computation and disk IO delay + // + 2 * heartbeat(TTL) for expected time between proposal by follower and commit at the follower + return 5*time.Second + 2*time.Duration(c.TickMs)*time.Millisecond +} + func (c *ServerConfig) PrintWithInitial() { c.print(true) } func (c *ServerConfig) Print() { c.print(false) } diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 16838af03..336be62bc 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -175,7 +175,7 @@ func (r *raftNode) start(s *EtcdServer) { } r.Advance() case <-syncC: - r.s.sync(defaultSyncTimeout) + r.s.sync(r.s.cfg.CommitTimeout()) case <-r.stopped: return } diff --git a/etcdserver/server.go b/etcdserver/server.go index 995912a10..fe3fcd49f 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -54,17 +54,13 @@ const ( // owner can make/remove files inside the directory privateDirMode = 0700 - defaultSyncTimeout = time.Second - DefaultSnapCount = 10000 - // TODO: calculate based on heartbeat interval - defaultPublishTimeout = 5 * time.Second + DefaultSnapCount = 10000 StoreClusterPrefix = "/0" StoreKeysPrefix = "/1" purgeFileInterval = 30 * time.Second monitorVersionInterval = 5 * time.Second - versionUpdateTimeout = 1 * time.Second ) var ( @@ -347,7 +343,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { // It also starts a goroutine to publish its server information. func (s *EtcdServer) Start() { s.start() - go s.publish(defaultPublishTimeout) + go s.publish(s.cfg.CommitTimeout()) go s.purgeFile() go monitorFileDescriptor(s.done) go s.monitorVersions() @@ -1005,7 +1001,7 @@ func (s *EtcdServer) updateClusterVersion(ver string) { Path: path.Join(StoreClusterPrefix, "version"), Val: ver, } - ctx, cancel := context.WithTimeout(context.Background(), versionUpdateTimeout) + ctx, cancel := context.WithTimeout(context.Background(), s.cfg.CommitTimeout()) _, err := s.Do(ctx, req) cancel() switch err { diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index d9d93ba5c..f1cc36e01 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -517,6 +517,7 @@ func TestDoProposal(t *testing.T) { for i, tt := range tests { st := &storeRecorder{} srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{ Node: newNodeCommitter(), storage: &storageRecorder{}, @@ -547,6 +548,7 @@ func TestDoProposal(t *testing.T) { func TestDoProposalCancelled(t *testing.T) { wait := &waitRecorder{} srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: &nodeRecorder{}}, w: wait, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -566,6 +568,7 @@ func TestDoProposalCancelled(t *testing.T) { func TestDoProposalTimeout(t *testing.T) { srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: &nodeRecorder{}}, w: &waitRecorder{}, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -579,6 +582,7 @@ func TestDoProposalTimeout(t *testing.T) { func TestDoProposalStopped(t *testing.T) { srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: &nodeRecorder{}}, w: &waitRecorder{}, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -653,6 +657,7 @@ func TestSyncTrigger(t *testing.T) { n := newReadyNode() st := make(chan time.Time, 1) srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{ Node: n, raftStorage: raft.NewMemoryStorage(), @@ -733,6 +738,7 @@ func TestTriggerSnap(t *testing.T) { st := &storeRecorder{} p := &storageRecorder{} srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, snapCount: uint64(snapc), r: raftNode{ Node: newNodeCommitter(), @@ -965,6 +971,7 @@ func TestPublish(t *testing.T) { ch <- Response{} w := &waitWithResponse{ch: ch} srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, id: 1, r: raftNode{Node: n}, attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, @@ -1006,6 +1013,7 @@ func TestPublish(t *testing.T) { // TestPublishStopped tests that publish will be stopped if server is stopped. func TestPublishStopped(t *testing.T) { srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{ Node: &nodeRecorder{}, transport: &nopTransporter{}, @@ -1024,6 +1032,7 @@ func TestPublishStopped(t *testing.T) { func TestPublishRetry(t *testing.T) { n := &nodeRecorder{} srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: n}, w: &waitRecorder{}, done: make(chan struct{}), @@ -1047,6 +1056,7 @@ func TestUpdateVersion(t *testing.T) { w := &waitWithResponse{ch: ch} srv := &EtcdServer{ id: 1, + cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: n}, attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, cluster: &cluster{}, From c3d4d11402bdc80da669f147c86966c5901bf360 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 11 Aug 2015 21:22:05 -0700 Subject: [PATCH 2/2] etcdhttp: adjust request timeout based on config It uses heartbeat interval and election timeout to estimate the expected request timeout. This PR helps etcd survive under high roundtrip-time environment, e.g., globally-deployed cluster. --- etcdmain/etcd.go | 2 +- etcdserver/config.go | 7 +++++++ etcdserver/etcdhttp/client.go | 10 ++++++---- etcdserver/etcdhttp/http.go | 6 ------ integration/cluster_test.go | 2 +- 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 810e30835..28105c53d 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -275,7 +275,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { plog.Infof("cors = %s", cfg.corsInfo) } ch := &cors.CORSHandler{ - Handler: etcdhttp.NewClientHandler(s), + Handler: etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout()), Info: cfg.corsInfo, } ph := etcdhttp.NewPeerHandler(s.Cluster(), s.RaftHandler()) diff --git a/etcdserver/config.go b/etcdserver/config.go index ba62c7087..57fc71723 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -111,6 +111,13 @@ func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap" func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" } +// ReqTimeout returns timeout for request to finish. +func (c *ServerConfig) ReqTimeout() time.Duration { + // CommitTimeout + // + 2 * election timeout for possible leader election + return c.CommitTimeout() + 2*time.Duration(c.ElectionTicks)*time.Duration(c.TickMs)*time.Millisecond +} + // CommitTimeout returns commit timeout under normal case. func (c *ServerConfig) CommitTimeout() time.Duration { // We assume that heartbeat >= TTL. diff --git a/etcdserver/etcdhttp/client.go b/etcdserver/etcdhttp/client.go index c59c1a5c5..fc498efc1 100644 --- a/etcdserver/etcdhttp/client.go +++ b/etcdserver/etcdhttp/client.go @@ -57,17 +57,17 @@ const ( ) // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests. -func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { +func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler { go capabilityLoop(server) - sec := auth.NewStore(server, defaultServerTimeout) + sec := auth.NewStore(server, timeout) kh := &keysHandler{ sec: sec, server: server, cluster: server.Cluster(), timer: server, - timeout: defaultServerTimeout, + timeout: timeout, } sh := &statsHandler{ @@ -78,6 +78,7 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { sec: sec, server: server, cluster: server.Cluster(), + timeout: timeout, clock: clockwork.NewRealClock(), } @@ -176,6 +177,7 @@ type membersHandler struct { sec auth.Store server etcdserver.Server cluster etcdserver.Cluster + timeout time.Duration clock clockwork.Clock } @@ -189,7 +191,7 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) - ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout) + ctx, cancel := context.WithTimeout(context.Background(), h.timeout) defer cancel() switch r.Method { diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 1393162d2..ffa72881b 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -28,12 +28,6 @@ import ( ) const ( - // time to wait for response from EtcdServer requests - // 5s for disk and network delay + 10*heartbeat for commit and possible - // leader switch - // TODO: use heartbeat set in etcdserver - defaultServerTimeout = 5*time.Second + 10*(100*time.Millisecond) - // time to wait for a Watch request defaultWatchTimeout = time.Duration(math.MaxInt64) ) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 892cab6c5..fa3ebf2aa 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -757,7 +757,7 @@ func (m *member) Launch() error { for _, ln := range m.ClientListeners { hs := &httptest.Server{ Listener: ln, - Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s)}, + Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())}, } hs.Start() m.hss = append(m.hss, hs)