diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index 5f6e001a6..0491337c3 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -30,10 +30,12 @@ import ( // agentConfig holds information needed to interact/configure an agent and its etcd process type agentConfig struct { - endpoint string - clientPort int - peerPort int - failpointPort int + endpoint string + clientPort int + advertiseClientPort int + peerPort int + advertisePeerPort int + failpointPort int } type cluster struct { @@ -61,12 +63,14 @@ func (c *cluster) bootstrap() error { return err } members[i] = &member{ - Agent: agent, - Endpoint: a.endpoint, - Name: fmt.Sprintf("etcd-%d", i), - ClientURL: fmt.Sprintf("http://%s:%d", host, a.clientPort), - PeerURL: fmt.Sprintf("http://%s:%d", host, a.peerPort), - FailpointURL: fmt.Sprintf("http://%s:%d", host, a.failpointPort), + Agent: agent, + Endpoint: a.endpoint, + Name: fmt.Sprintf("etcd-%d", i), + ClientURL: fmt.Sprintf("http://%s:%d", host, a.clientPort), + AdvertiseClientURL: fmt.Sprintf("http://%s:%d", host, a.advertiseClientPort), + PeerURL: fmt.Sprintf("http://%s:%d", host, a.peerPort), + AdvertisePeerURL: fmt.Sprintf("http://%s:%d", host, a.advertisePeerPort), + FailpointURL: fmt.Sprintf("http://%s:%d", host, a.failpointPort), } memberNameURLs[i] = members[i].ClusterEntry() } diff --git a/tools/functional-tester/etcd-tester/failure.go b/tools/functional-tester/etcd-tester/failure.go index cbc4a52ea..098c0e839 100644 --- a/tools/functional-tester/etcd-tester/failure.go +++ b/tools/functional-tester/etcd-tester/failure.go @@ -128,7 +128,10 @@ func (f *failureDelay) Inject(c *cluster, round int) error { if err := f.failure.Inject(c, round); err != nil { return err } - time.Sleep(f.delayDuration) + if f.delayDuration > 0 { + plog.Infof("sleeping delay duration %v for %q", f.delayDuration, f.failure.Desc()) + time.Sleep(f.delayDuration) + } return nil } diff --git a/tools/functional-tester/etcd-tester/failure_agent.go b/tools/functional-tester/etcd-tester/failure_agent.go index 5dddec530..49dff8ccd 100644 --- a/tools/functional-tester/etcd-tester/failure_agent.go +++ b/tools/functional-tester/etcd-tester/failure_agent.go @@ -24,6 +24,9 @@ const ( slowNetworkLatency = 500 // 500 millisecond randomVariation = 50 + // delay duration to trigger leader election (default election timeout 1s) + triggerElectionDur = 5 * time.Second + // Wait more when it recovers from slow network, because network layer // needs extra time to propagate traffic control (tc command) change. // Otherwise, we get different hash values from the previous revision. @@ -82,19 +85,27 @@ func injectDropPort(m *member) error { return m.Agent.DropPort(m.peerPort()) } func recoverDropPort(m *member) error { return m.Agent.RecoverPort(m.peerPort()) } func newFailureIsolate() failure { - return &failureOne{ + f := &failureOne{ description: "isolate one member", injectMember: injectDropPort, recoverMember: recoverDropPort, } + return &failureDelay{ + failure: f, + delayDuration: triggerElectionDur, + } } func newFailureIsolateAll() failure { - return &failureAll{ + f := &failureAll{ description: "isolate all members", injectMember: injectDropPort, recoverMember: recoverDropPort, } + return &failureDelay{ + failure: f, + delayDuration: triggerElectionDur, + } } func injectLatency(m *member) error { @@ -115,11 +126,15 @@ func recoverLatency(m *member) error { func newFailureSlowNetworkOneMember() failure { desc := fmt.Sprintf("slow down one member's network by adding %d ms latency", slowNetworkLatency) - return &failureOne{ + f := &failureOne{ description: description(desc), injectMember: injectLatency, recoverMember: recoverLatency, } + return &failureDelay{ + failure: f, + delayDuration: triggerElectionDur, + } } func newFailureSlowNetworkLeader() failure { @@ -129,15 +144,23 @@ func newFailureSlowNetworkLeader() failure { injectMember: injectLatency, recoverMember: recoverLatency, } - return &failureLeader{ff, 0} + f := &failureLeader{ff, 0} + return &failureDelay{ + failure: f, + delayDuration: triggerElectionDur, + } } func newFailureSlowNetworkAll() failure { - return &failureAll{ + f := &failureAll{ description: "slow down all members' network", injectMember: injectLatency, recoverMember: recoverLatency, } + return &failureDelay{ + failure: f, + delayDuration: triggerElectionDur, + } } func newFailureNop() failure { diff --git a/tools/functional-tester/etcd-tester/main.go b/tools/functional-tester/etcd-tester/main.go index 0197b1a95..5f5853206 100644 --- a/tools/functional-tester/etcd-tester/main.go +++ b/tools/functional-tester/etcd-tester/main.go @@ -41,7 +41,9 @@ const ( func main() { endpointStr := flag.String("agent-endpoints", "localhost:9027", "HTTP RPC endpoints of agents. Do not specify the schema.") clientPorts := flag.String("client-ports", "", "etcd client port for each agent endpoint") + advertiseClientPorts := flag.String("advertise-client-ports", "", "etcd advertise client port for each agent endpoint") peerPorts := flag.String("peer-ports", "", "etcd peer port for each agent endpoint") + advertisePeerPorts := flag.String("advertise-peer-ports", "", "etcd advertise peer port for each agent endpoint") failpointPorts := flag.String("failpoint-ports", "", "etcd failpoint port for each agent endpoint") stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.") @@ -67,14 +69,18 @@ func main() { eps := strings.Split(*endpointStr, ",") cports := portsFromArg(*clientPorts, len(eps), defaultClientPort) + acports := portsFromArg(*advertiseClientPorts, len(eps), defaultClientPort) pports := portsFromArg(*peerPorts, len(eps), defaultPeerPort) + apports := portsFromArg(*advertisePeerPorts, len(eps), defaultPeerPort) fports := portsFromArg(*failpointPorts, len(eps), defaultFailpointPort) agents := make([]agentConfig, len(eps)) for i := range eps { agents[i].endpoint = eps[i] agents[i].clientPort = cports[i] + agents[i].advertiseClientPort = acports[i] agents[i].peerPort = pports[i] + agents[i].advertisePeerPort = apports[i] agents[i].failpointPort = fports[i] } diff --git a/tools/functional-tester/etcd-tester/member.go b/tools/functional-tester/etcd-tester/member.go index d8567f2dd..bb129753d 100644 --- a/tools/functional-tester/etcd-tester/member.go +++ b/tools/functional-tester/etcd-tester/member.go @@ -29,23 +29,25 @@ import ( ) type member struct { - Agent client.Agent - Endpoint string - Name string - ClientURL string - PeerURL string - FailpointURL string + Agent client.Agent + Endpoint string + Name string + ClientURL string + AdvertiseClientURL string + PeerURL string + AdvertisePeerURL string + FailpointURL string } -func (m *member) ClusterEntry() string { return m.Name + "=" + m.PeerURL } +func (m *member) ClusterEntry() string { return m.Name + "=" + m.AdvertisePeerURL } func (m *member) Flags() []string { return []string{ "--name", m.Name, "--listen-client-urls", m.ClientURL, - "--advertise-client-urls", m.ClientURL, + "--advertise-client-urls", m.AdvertiseClientURL, "--listen-peer-urls", m.PeerURL, - "--initial-advertise-peer-urls", m.PeerURL, + "--initial-advertise-peer-urls", m.AdvertisePeerURL, "--initial-cluster-state", "new", "--experimental-initial-corrupt-check", } @@ -54,7 +56,7 @@ func (m *member) Flags() []string { func (m *member) CheckCompact(rev int64) error { cli, err := m.newClientV3() if err != nil { - return fmt.Errorf("%v (endpoint %s)", err, m.ClientURL) + return fmt.Errorf("%v (endpoint %s)", err, m.AdvertiseClientURL) } defer cli.Close() @@ -64,29 +66,29 @@ func (m *member) CheckCompact(rev int64) error { cancel() if !ok { - return fmt.Errorf("watch channel terminated (endpoint %s)", m.ClientURL) + return fmt.Errorf("watch channel terminated (endpoint %s)", m.AdvertiseClientURL) } if wr.CompactRevision != rev { - return fmt.Errorf("got compact revision %v, wanted %v (endpoint %s)", wr.CompactRevision, rev, m.ClientURL) + return fmt.Errorf("got compact revision %v, wanted %v (endpoint %s)", wr.CompactRevision, rev, m.AdvertiseClientURL) } return nil } func (m *member) Defrag() error { - plog.Printf("defragmenting %s\n", m.ClientURL) + plog.Printf("defragmenting %s\n", m.AdvertiseClientURL) cli, err := m.newClientV3() if err != nil { return err } defer cli.Close() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - _, err = cli.Defragment(ctx, m.ClientURL) + _, err = cli.Defragment(ctx, m.AdvertiseClientURL) cancel() if err != nil { return err } - plog.Printf("defragmented %s\n", m.ClientURL) + plog.Printf("defragmented %s\n", m.AdvertiseClientURL) return nil } @@ -114,7 +116,7 @@ func (m *member) Rev(ctx context.Context) (int64, error) { return 0, err } defer cli.Close() - resp, err := cli.Status(ctx, m.ClientURL) + resp, err := cli.Status(ctx, m.AdvertiseClientURL) if err != nil { return 0, err } @@ -127,7 +129,7 @@ func (m *member) IsLeader() (bool, error) { return false, err } defer cli.Close() - resp, err := cli.Status(context.Background(), m.ClientURL) + resp, err := cli.Status(context.Background(), m.AdvertiseClientURL) if err != nil { return false, err } @@ -137,7 +139,7 @@ func (m *member) IsLeader() (bool, error) { func (m *member) SetHealthKeyV3() error { cli, err := m.newClientV3() if err != nil { - return fmt.Errorf("%v (%s)", err, m.ClientURL) + return fmt.Errorf("%v (%s)", err, m.AdvertiseClientURL) } defer cli.Close() // give enough time-out in case expensive requests (range/delete) are pending @@ -145,14 +147,14 @@ func (m *member) SetHealthKeyV3() error { _, err = cli.Put(ctx, "health", "good") cancel() if err != nil { - return fmt.Errorf("%v (%s)", err, m.ClientURL) + return fmt.Errorf("%v (%s)", err, m.AdvertiseClientURL) } return nil } func (m *member) newClientV3() (*clientv3.Client, error) { return clientv3.New(clientv3.Config{ - Endpoints: []string{m.ClientURL}, + Endpoints: []string{m.AdvertiseClientURL}, DialTimeout: 5 * time.Second, }) } @@ -163,7 +165,7 @@ func (m *member) dialGRPC() (*grpc.ClientConn, error) { // grpcAddr gets the host from clientURL so it works with grpc.Dial() func (m *member) grpcAddr() string { - u, err := url.Parse(m.ClientURL) + u, err := url.Parse(m.AdvertiseClientURL) if err != nil { panic(err) } @@ -171,7 +173,7 @@ func (m *member) grpcAddr() string { } func (m *member) peerPort() (port int) { - u, err := url.Parse(m.PeerURL) + u, err := url.Parse(m.AdvertisePeerURL) if err != nil { panic(err) }