diff --git a/integration/cluster.go b/integration/cluster.go index e5aa0bc59..3f24bbd07 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -336,6 +336,7 @@ func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { func (c *cluster) WaitLeader(t *testing.T) int { return c.waitLeader(t, c.Members) } +// waitLeader waits until given members agree on the same leader. func (c *cluster) waitLeader(t *testing.T, membs []*member) int { possibleLead := make(map[uint64]bool) var lead uint64 @@ -369,6 +370,28 @@ func (c *cluster) waitLeader(t *testing.T, membs []*member) int { return -1 } +func (c *cluster) WaitNoLeader(t *testing.T) { c.waitNoLeader(t, c.Members) } + +// waitNoLeader waits until given members lose leader. +func (c *cluster) waitNoLeader(t *testing.T, membs []*member) { + noLeader := false + for !noLeader { + noLeader = true + for _, m := range membs { + select { + case <-m.s.StopNotify(): + continue + default: + } + if m.s.Lead() != 0 { + noLeader = false + time.Sleep(10 * tickDuration) + break + } + } + } +} + func (c *cluster) waitVersion() { for _, m := range c.Members { for { @@ -502,6 +525,10 @@ func (m *member) listenGRPC() error { return nil } +func (m *member) electionTimeout() time.Duration { + return time.Duration(m.s.Cfg.ElectionTicks) * time.Millisecond +} + func (m *member) DropConnections() { m.grpcBridge.Reset() } // NewClientV3 creates a new grpc client connection to the member @@ -741,6 +768,22 @@ func (m *member) Metric(metricName string) (string, error) { return "", nil } +// InjectPartition drops connections from m to others, vice versa. +func (m *member) InjectPartition(t *testing.T, others []*member) { + for _, other := range others { + m.s.CutPeer(other.s.ID()) + other.s.CutPeer(m.s.ID()) + } +} + +// RecoverPartition recovers connections from m to others, vice versa. +func (m *member) RecoverPartition(t *testing.T, others []*member) { + for _, other := range others { + m.s.MendPeer(other.s.ID()) + other.s.MendPeer(m.s.ID()) + } +} + func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client { cfgtls := transport.TLSInfo{} if tls != nil { diff --git a/integration/network_partition_test.go b/integration/network_partition_test.go new file mode 100644 index 000000000..0656bb00c --- /dev/null +++ b/integration/network_partition_test.go @@ -0,0 +1,136 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "testing" + "time" + + "github.com/coreos/etcd/pkg/testutil" +) + +func TestNetworkPartition5MembersLeaderInMinority(t *testing.T) { + defer testutil.AfterTest(t) + + clus := NewClusterV3(t, &ClusterConfig{Size: 5}) + defer clus.Terminate(t) + + leadIndex := clus.WaitLeader(t) + + // minority: leader, follower / majority: follower, follower, follower + minority := []int{leadIndex, (leadIndex + 1) % 5} + majority := []int{(leadIndex + 2) % 5, (leadIndex + 3) % 5, (leadIndex + 4) % 5} + + minorityMembers := getMembersByIndexSlice(clus.cluster, minority) + majorityMembers := getMembersByIndexSlice(clus.cluster, majority) + + // network partition (bi-directional) + injectPartition(t, minorityMembers, majorityMembers) + + // minority leader must be lost + clus.waitNoLeader(t, minorityMembers) + + // wait extra election timeout + time.Sleep(2 * majorityMembers[0].electionTimeout()) + + // new leader must be from majority + clus.waitLeader(t, majorityMembers) + + // recover network partition (bi-directional) + recoverPartition(t, minorityMembers, majorityMembers) + clusterMustProgress(t, clus.Members) +} + +func TestNetworkPartition5MembersLeaderInMajority(t *testing.T) { + defer testutil.AfterTest(t) + + clus := NewClusterV3(t, &ClusterConfig{Size: 5}) + defer clus.Terminate(t) + + leadIndex := clus.WaitLeader(t) + + // majority: leader, follower, follower / minority: follower, follower + majority := []int{leadIndex, (leadIndex + 1) % 5, (leadIndex + 2) % 5} + minority := []int{(leadIndex + 3) % 5, (leadIndex + 4) % 5} + + majorityMembers := getMembersByIndexSlice(clus.cluster, majority) + minorityMembers := getMembersByIndexSlice(clus.cluster, minority) + + // network partition (bi-directional) + injectPartition(t, majorityMembers, minorityMembers) + + // minority leader must be lost + clus.waitNoLeader(t, minorityMembers) + + // wait extra election timeout + time.Sleep(2 * majorityMembers[0].electionTimeout()) + + // leader must be hold in majority + leadIndex2 := clus.waitLeader(t, majorityMembers) + leadID, leadID2 := clus.Members[leadIndex].s.ID(), majorityMembers[leadIndex2].s.ID() + if leadID != leadID2 { + t.Fatalf("unexpected leader change from %s, got %s", leadID, leadID2) + } + + // recover network partition (bi-directional) + recoverPartition(t, majorityMembers, minorityMembers) + clusterMustProgress(t, clus.Members) +} + +func TestNetworkPartition4Members(t *testing.T) { + defer testutil.AfterTest(t) + + clus := NewClusterV3(t, &ClusterConfig{Size: 4}) + defer clus.Terminate(t) + + leadIndex := clus.WaitLeader(t) + + // groupA: leader, follower / groupB: follower, follower + groupA := []int{leadIndex, (leadIndex + 1) % 4} + groupB := []int{(leadIndex + 2) % 4, (leadIndex + 3) % 4} + + leaderPartition := getMembersByIndexSlice(clus.cluster, groupA) + followerPartition := getMembersByIndexSlice(clus.cluster, groupB) + + // network partition (bi-directional) + injectPartition(t, leaderPartition, followerPartition) + + // no group has quorum, so leader must be lost in all members + clus.WaitNoLeader(t) + + // recover network partition (bi-directional) + recoverPartition(t, leaderPartition, followerPartition) + clusterMustProgress(t, clus.Members) +} + +func getMembersByIndexSlice(clus *cluster, idxs []int) []*member { + ms := make([]*member, len(idxs)) + for i, idx := range idxs { + ms[i] = clus.Members[idx] + } + return ms +} + +func injectPartition(t *testing.T, src, others []*member) { + for _, m := range src { + m.InjectPartition(t, others) + } +} + +func recoverPartition(t *testing.T, src, others []*member) { + for _, m := range src { + m.RecoverPartition(t, others) + } +}