mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #6156 from heyitsanthony/remove-member-quorum
etcdserver: reject member removal that breaks active quorum
This commit is contained in:
commit
965b2901d5
@ -24,8 +24,10 @@ import (
|
||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
)
|
||||
|
||||
func TestCtlV3MemberList(t *testing.T) { testCtl(t, memberListTest) }
|
||||
func TestCtlV3MemberRemove(t *testing.T) { testCtl(t, memberRemoveTest, withQuorum()) }
|
||||
func TestCtlV3MemberList(t *testing.T) { testCtl(t, memberListTest) }
|
||||
func TestCtlV3MemberRemove(t *testing.T) {
|
||||
testCtl(t, memberRemoveTest, withQuorum(), withNoStrictReconfig())
|
||||
}
|
||||
func TestCtlV3MemberAdd(t *testing.T) { testCtl(t, memberAddTest) }
|
||||
func TestCtlV3MemberUpdate(t *testing.T) { testCtl(t, memberUpdateTest) }
|
||||
|
||||
|
@ -41,6 +41,7 @@ type ctlCtx struct {
|
||||
t *testing.T
|
||||
cfg etcdProcessClusterConfig
|
||||
quotaBackendBytes int64
|
||||
noStrictReconfig bool
|
||||
|
||||
epc *etcdProcessCluster
|
||||
|
||||
@ -88,6 +89,10 @@ func withCompactPhysical() ctlOption {
|
||||
return func(cx *ctlCtx) { cx.compactPhysical = true }
|
||||
}
|
||||
|
||||
func withNoStrictReconfig() ctlOption {
|
||||
return func(cx *ctlCtx) { cx.noStrictReconfig = true }
|
||||
}
|
||||
|
||||
func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
@ -106,6 +111,7 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
|
||||
if ret.quotaBackendBytes > 0 {
|
||||
ret.cfg.quotaBackendBytes = ret.quotaBackendBytes
|
||||
}
|
||||
ret.cfg.noStrictReconfig = ret.noStrictReconfig
|
||||
|
||||
epc, err := newEtcdProcessCluster(&ret.cfg)
|
||||
if err != nil {
|
||||
|
@ -165,6 +165,7 @@ type etcdProcessClusterConfig struct {
|
||||
forceNewCluster bool
|
||||
initialToken string
|
||||
quotaBackendBytes int64
|
||||
noStrictReconfig bool
|
||||
}
|
||||
|
||||
// newEtcdProcessCluster launches a new cluster from etcd processes, returning
|
||||
@ -285,6 +286,9 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
|
||||
"--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
|
||||
)
|
||||
}
|
||||
if cfg.noStrictReconfig {
|
||||
args = append(args, "--strict-reconfig-check=false")
|
||||
}
|
||||
|
||||
args = append(args, cfg.tlsArgs()...)
|
||||
etcdCfgs[i] = &etcdProcessConfig{
|
||||
|
@ -844,10 +844,9 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) erro
|
||||
}
|
||||
|
||||
func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
|
||||
if s.Cfg.StrictReconfigCheck && !s.cluster.IsReadyToRemoveMember(id) {
|
||||
// If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd.
|
||||
// In such a case removing a member is allowed unconditionally
|
||||
return ErrNotEnoughStartedMembers
|
||||
// by default StrictReconfigCheck is enabled; reject removal if leads to quorum loss
|
||||
if err := s.mayRemoveMember(types.ID(id)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cc := raftpb.ConfChange{
|
||||
@ -857,6 +856,32 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
|
||||
return s.configure(ctx, cc)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) mayRemoveMember(id types.ID) error {
|
||||
if !s.Cfg.StrictReconfigCheck {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !s.cluster.IsReadyToRemoveMember(uint64(id)) {
|
||||
plog.Warningf("not enough started members, rejecting remove member %s", id)
|
||||
return ErrNotEnoughStartedMembers
|
||||
}
|
||||
|
||||
// downed member is safe to remove since it's not part of the active quorum
|
||||
if t := s.r.transport.ActiveSince(id); id != s.ID() && t.IsZero() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// protect quorum if some members are down
|
||||
m := s.cluster.Members()
|
||||
active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), m)
|
||||
if (active - 1) < 1+((len(m)-1)/2) {
|
||||
plog.Warningf("reconfigure breaks active quorum, rejecting remove member %s", id)
|
||||
return ErrUnhealthy
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) error {
|
||||
b, err := json.Marshal(memb)
|
||||
if err != nil {
|
||||
|
@ -25,13 +25,7 @@ import (
|
||||
// isConnectedToQuorumSince checks whether the local member is connected to the
|
||||
// quorum of the cluster since the given time.
|
||||
func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool {
|
||||
var connectedNum int
|
||||
for _, m := range members {
|
||||
if m.ID == self || isConnectedSince(transport, since, m.ID) {
|
||||
connectedNum++
|
||||
}
|
||||
}
|
||||
return connectedNum >= (len(members)+1)/2
|
||||
return numConnectedSince(transport, since, self, members) >= (len(members)/2)+1
|
||||
}
|
||||
|
||||
// isConnectedSince checks whether the local member is connected to the
|
||||
@ -44,10 +38,17 @@ func isConnectedSince(transport rafthttp.Transporter, since time.Time, remote ty
|
||||
// isConnectedFullySince checks whether the local member is connected to all
|
||||
// members in the cluster since the given time.
|
||||
func isConnectedFullySince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool {
|
||||
return numConnectedSince(transport, since, self, members) == len(members)
|
||||
}
|
||||
|
||||
// numConnectedSince counts how many members are connected to the local member
|
||||
// since the given time.
|
||||
func numConnectedSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) int {
|
||||
connectedNum := 0
|
||||
for _, m := range members {
|
||||
if m.ID != self && !isConnectedSince(transport, since, m.ID) {
|
||||
return false
|
||||
if m.ID == self || isConnectedSince(transport, since, m.ID) {
|
||||
connectedNum++
|
||||
}
|
||||
}
|
||||
return true
|
||||
return connectedNum
|
||||
}
|
||||
|
@ -276,12 +276,18 @@ func (c *cluster) AddMember(t *testing.T) {
|
||||
}
|
||||
|
||||
func (c *cluster) RemoveMember(t *testing.T, id uint64) {
|
||||
if err := c.removeMember(t, id); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cluster) removeMember(t *testing.T, id uint64) error {
|
||||
// send remove request to the cluster
|
||||
cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
|
||||
ma := client.NewMembersAPI(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
|
||||
t.Fatalf("unexpected remove error %v", err)
|
||||
return err
|
||||
}
|
||||
cancel()
|
||||
newMembers := make([]*member, 0)
|
||||
@ -302,6 +308,7 @@ func (c *cluster) RemoveMember(t *testing.T, id uint64) {
|
||||
}
|
||||
c.Members = newMembers
|
||||
c.waitMembersMatch(t, c.HTTPMembers())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cluster) Terminate(t *testing.T) {
|
||||
|
@ -391,6 +391,52 @@ func TestRejectUnhealthyAdd(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRejectUnhealthyRemove ensures an unhealthy cluster rejects removing members
|
||||
// if quorum will be lost.
|
||||
func TestRejectUnhealthyRemove(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
c := NewCluster(t, 5)
|
||||
for _, m := range c.Members {
|
||||
m.ServerConfig.StrictReconfigCheck = true
|
||||
}
|
||||
c.Launch(t)
|
||||
defer c.Terminate(t)
|
||||
|
||||
// make cluster unhealthy and wait for downed peer; (3 up, 2 down)
|
||||
c.Members[0].Stop(t)
|
||||
c.Members[1].Stop(t)
|
||||
c.WaitLeader(t)
|
||||
|
||||
// reject remove active member since (3,2)-(1,0) => (2,2) lacks quorum
|
||||
err := c.removeMember(t, uint64(c.Members[2].s.ID()))
|
||||
if err == nil {
|
||||
t.Fatalf("should reject quorum breaking remove")
|
||||
}
|
||||
// TODO: client should return more descriptive error codes for internal errors
|
||||
if !strings.Contains(err.Error(), "has no leader") {
|
||||
t.Errorf("unexpected error (%v)", err)
|
||||
}
|
||||
|
||||
// member stopped after launch; wait for missing heartbeats
|
||||
time.Sleep(time.Duration(electionTicks * int(tickDuration)))
|
||||
|
||||
// permit remove dead member since (3,2) - (0,1) => (3,1) has quorum
|
||||
if err = c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil {
|
||||
t.Fatalf("should accept removing down member")
|
||||
}
|
||||
|
||||
// bring cluster to (4,1)
|
||||
c.Members[0].Restart(t)
|
||||
|
||||
// restarted member must be connected for a HealthInterval before remove is accepted
|
||||
time.Sleep((3 * etcdserver.HealthInterval) / 2)
|
||||
|
||||
// accept remove member since (4,1)-(1,0) => (3,1) has quorum
|
||||
if err = c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil {
|
||||
t.Fatalf("expected to remove member, got error %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// clusterMustProgress ensures that cluster can make progress. It creates
|
||||
// a random key first, and check the new key could be got from all client urls
|
||||
// of the cluster.
|
||||
|
Loading…
x
Reference in New Issue
Block a user