From 5a91937367840dd4b376ba176593cfad7497759f Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 11 Aug 2015 21:09:03 -0700 Subject: [PATCH] etcdserver: adjust commit timeout based on config It uses heartbeat interval and election timeout to estimate the commit timeout for internal requests. This PR helps etcd survive under high roundtrip-time environment, e.g., globally-deployed cluster. --- etcdserver/config.go | 9 +++++++++ etcdserver/raft.go | 2 +- etcdserver/server.go | 10 +++------- etcdserver/server_test.go | 10 ++++++++++ 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/etcdserver/config.go b/etcdserver/config.go index b19eae375..ba62c7087 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -20,6 +20,7 @@ import ( "path" "reflect" "sort" + "time" "github.com/coreos/etcd/pkg/types" ) @@ -110,6 +111,14 @@ func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap" func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" } +// CommitTimeout returns commit timeout under normal case. +func (c *ServerConfig) CommitTimeout() time.Duration { + // We assume that heartbeat >= TTL. + // 5s for queue waiting, computation and disk IO delay + // + 2 * heartbeat(TTL) for expected time between proposal by follower and commit at the follower + return 5*time.Second + 2*time.Duration(c.TickMs)*time.Millisecond +} + func (c *ServerConfig) PrintWithInitial() { c.print(true) } func (c *ServerConfig) Print() { c.print(false) } diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 16838af03..336be62bc 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -175,7 +175,7 @@ func (r *raftNode) start(s *EtcdServer) { } r.Advance() case <-syncC: - r.s.sync(defaultSyncTimeout) + r.s.sync(r.s.cfg.CommitTimeout()) case <-r.stopped: return } diff --git a/etcdserver/server.go b/etcdserver/server.go index 995912a10..fe3fcd49f 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -54,17 +54,13 @@ const ( // owner can make/remove files inside the directory privateDirMode = 0700 - defaultSyncTimeout = time.Second - DefaultSnapCount = 10000 - // TODO: calculate based on heartbeat interval - defaultPublishTimeout = 5 * time.Second + DefaultSnapCount = 10000 StoreClusterPrefix = "/0" StoreKeysPrefix = "/1" purgeFileInterval = 30 * time.Second monitorVersionInterval = 5 * time.Second - versionUpdateTimeout = 1 * time.Second ) var ( @@ -347,7 +343,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { // It also starts a goroutine to publish its server information. func (s *EtcdServer) Start() { s.start() - go s.publish(defaultPublishTimeout) + go s.publish(s.cfg.CommitTimeout()) go s.purgeFile() go monitorFileDescriptor(s.done) go s.monitorVersions() @@ -1005,7 +1001,7 @@ func (s *EtcdServer) updateClusterVersion(ver string) { Path: path.Join(StoreClusterPrefix, "version"), Val: ver, } - ctx, cancel := context.WithTimeout(context.Background(), versionUpdateTimeout) + ctx, cancel := context.WithTimeout(context.Background(), s.cfg.CommitTimeout()) _, err := s.Do(ctx, req) cancel() switch err { diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index d9d93ba5c..f1cc36e01 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -517,6 +517,7 @@ func TestDoProposal(t *testing.T) { for i, tt := range tests { st := &storeRecorder{} srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{ Node: newNodeCommitter(), storage: &storageRecorder{}, @@ -547,6 +548,7 @@ func TestDoProposal(t *testing.T) { func TestDoProposalCancelled(t *testing.T) { wait := &waitRecorder{} srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: &nodeRecorder{}}, w: wait, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -566,6 +568,7 @@ func TestDoProposalCancelled(t *testing.T) { func TestDoProposalTimeout(t *testing.T) { srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: &nodeRecorder{}}, w: &waitRecorder{}, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -579,6 +582,7 @@ func TestDoProposalTimeout(t *testing.T) { func TestDoProposalStopped(t *testing.T) { srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: &nodeRecorder{}}, w: &waitRecorder{}, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -653,6 +657,7 @@ func TestSyncTrigger(t *testing.T) { n := newReadyNode() st := make(chan time.Time, 1) srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{ Node: n, raftStorage: raft.NewMemoryStorage(), @@ -733,6 +738,7 @@ func TestTriggerSnap(t *testing.T) { st := &storeRecorder{} p := &storageRecorder{} srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, snapCount: uint64(snapc), r: raftNode{ Node: newNodeCommitter(), @@ -965,6 +971,7 @@ func TestPublish(t *testing.T) { ch <- Response{} w := &waitWithResponse{ch: ch} srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, id: 1, r: raftNode{Node: n}, attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, @@ -1006,6 +1013,7 @@ func TestPublish(t *testing.T) { // TestPublishStopped tests that publish will be stopped if server is stopped. func TestPublishStopped(t *testing.T) { srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{ Node: &nodeRecorder{}, transport: &nopTransporter{}, @@ -1024,6 +1032,7 @@ func TestPublishStopped(t *testing.T) { func TestPublishRetry(t *testing.T) { n := &nodeRecorder{} srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: n}, w: &waitRecorder{}, done: make(chan struct{}), @@ -1047,6 +1056,7 @@ func TestUpdateVersion(t *testing.T) { w := &waitWithResponse{ch: ch} srv := &EtcdServer{ id: 1, + cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: n}, attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, cluster: &cluster{},