diff --git a/embed/config.go b/embed/config.go index 719d0680d..5e51c314b 100644 --- a/embed/config.go +++ b/embed/config.go @@ -153,6 +153,7 @@ func NewConfig() *Config { ACUrls: []url.URL{*acurl}, ClusterState: ClusterStateFlagNew, InitialClusterToken: "etcd-cluster", + StrictReconfigCheck: true, } cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) return cfg diff --git a/etcdserver/api/v2http/http.go b/etcdserver/api/v2http/http.go index cad9ec80d..c22d81436 100644 --- a/etcdserver/api/v2http/http.go +++ b/etcdserver/api/v2http/http.go @@ -60,7 +60,7 @@ func writeError(w http.ResponseWriter, r *http.Request, err error) { } default: switch err { - case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost, etcdserver.ErrNotEnoughStartedMembers: + case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost, etcdserver.ErrNotEnoughStartedMembers, etcdserver.ErrUnhealthy: mlog.MergeError(err) default: mlog.MergeErrorf("got unexpected response error (%v)", err) diff --git a/etcdserver/errors.go b/etcdserver/errors.go index 9c74b7195..032960aff 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -32,6 +32,7 @@ var ( ErrNoSpace = errors.New("etcdserver: no space") ErrInvalidAuthToken = errors.New("etcdserver: invalid auth token") ErrTooManyRequests = errors.New("etcdserver: too many requests") + ErrUnhealthy = errors.New("etcdserver: unhealthy cluster") ) type DiscoveryError struct { diff --git a/etcdserver/server.go b/etcdserver/server.go index c0732b883..9e0fb7509 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -65,6 +65,10 @@ const ( StoreClusterPrefix = "/0" StoreKeysPrefix = "/1" + // HealthInterval is the minimum time the cluster should be healthy + // before accepting add member requests. + HealthInterval = 5 * time.Second + purgeFileInterval = 30 * time.Second // monitorVersionInterval should be smaller than the timeout // on the connection. Or we will not be able to reuse the connection @@ -814,10 +818,16 @@ func (s *EtcdServer) LeaderStats() []byte { func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() } func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) error { - if s.Cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() { - // If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd. - // In such a case adding a new member is allowed unconditionally - return ErrNotEnoughStartedMembers + if s.Cfg.StrictReconfigCheck { + // by default StrictReconfigCheck is enabled; reject new members if unhealthy + if !s.cluster.IsReadyToAddNewMember() { + plog.Warningf("not enough started members, rejecting member add %+v", memb) + return ErrNotEnoughStartedMembers + } + if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.Members()) { + plog.Warningf("not healthy for reconfigure, rejecting member add %+v", memb) + return ErrUnhealthy + } } // TODO: move Member to protobuf type diff --git a/etcdserver/util.go b/etcdserver/util.go index 32c161743..5a3fd81ee 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -40,3 +40,14 @@ func isConnectedSince(transport rafthttp.Transporter, since time.Time, remote ty t := transport.ActiveSince(remote) return !t.IsZero() && t.Before(since) } + +// isConnectedFullySince checks whether the local member is connected to all +// members in the cluster since the given time. +func isConnectedFullySince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool { + for _, m := range members { + if m.ID != self && !isConnectedSince(transport, since, m.ID) { + return false + } + } + return true +} diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 1ecc3cd79..ecf187954 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -20,10 +20,12 @@ import ( "math/rand" "os" "strconv" + "strings" "testing" "time" "github.com/coreos/etcd/client" + "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/pkg/testutil" "golang.org/x/net/context" @@ -346,6 +348,49 @@ func TestIssue3699(t *testing.T) { cancel() } +// TestRejectUnhealthyAdd ensures an unhealthy cluster rejects adding members. +func TestRejectUnhealthyAdd(t *testing.T) { + defer testutil.AfterTest(t) + c := NewCluster(t, 3) + for _, m := range c.Members { + m.ServerConfig.StrictReconfigCheck = true + } + c.Launch(t) + defer c.Terminate(t) + + // make cluster unhealthy and wait for downed peer + c.Members[0].Stop(t) + c.WaitLeader(t) + + // all attempts to add member should fail + for i := 1; i < len(c.Members); i++ { + err := c.addMemberByURL(t, c.URL(i), "unix://foo:12345") + if err == nil { + t.Fatalf("should have failed adding peer") + } + // TODO: client should return descriptive error codes for internal errors + if !strings.Contains(err.Error(), "has no leader") { + t.Errorf("unexpected error (%v)", err) + } + } + + // make cluster healthy + c.Members[0].Restart(t) + c.WaitLeader(t) + time.Sleep(2 * etcdserver.HealthInterval) + + // add member should succeed now that it's healthy + var err error + for i := 1; i < len(c.Members); i++ { + if err = c.addMemberByURL(t, c.URL(i), "unix://foo:12345"); err == nil { + break + } + } + if err != nil { + t.Fatalf("should have added peer to healthy cluster (%v)", err) + } +} + // clusterMustProgress ensures that cluster can make progress. It creates // a random key first, and check the new key could be got from all client urls // of the cluster.