etcd-tester: set advertise ports, delay w/ network faults

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee 2018-01-02 08:55:59 -08:00
parent 18df07754f
commit bbfd0077e8
5 changed files with 76 additions and 38 deletions

View File

@ -30,10 +30,12 @@ import (
// agentConfig holds information needed to interact/configure an agent and its etcd process
type agentConfig struct {
endpoint string
clientPort int
peerPort int
failpointPort int
endpoint string
clientPort int
advertiseClientPort int
peerPort int
advertisePeerPort int
failpointPort int
}
type cluster struct {
@ -61,12 +63,14 @@ func (c *cluster) bootstrap() error {
return err
}
members[i] = &member{
Agent: agent,
Endpoint: a.endpoint,
Name: fmt.Sprintf("etcd-%d", i),
ClientURL: fmt.Sprintf("http://%s:%d", host, a.clientPort),
PeerURL: fmt.Sprintf("http://%s:%d", host, a.peerPort),
FailpointURL: fmt.Sprintf("http://%s:%d", host, a.failpointPort),
Agent: agent,
Endpoint: a.endpoint,
Name: fmt.Sprintf("etcd-%d", i),
ClientURL: fmt.Sprintf("http://%s:%d", host, a.clientPort),
AdvertiseClientURL: fmt.Sprintf("http://%s:%d", host, a.advertiseClientPort),
PeerURL: fmt.Sprintf("http://%s:%d", host, a.peerPort),
AdvertisePeerURL: fmt.Sprintf("http://%s:%d", host, a.advertisePeerPort),
FailpointURL: fmt.Sprintf("http://%s:%d", host, a.failpointPort),
}
memberNameURLs[i] = members[i].ClusterEntry()
}

View File

@ -128,7 +128,10 @@ func (f *failureDelay) Inject(c *cluster, round int) error {
if err := f.failure.Inject(c, round); err != nil {
return err
}
time.Sleep(f.delayDuration)
if f.delayDuration > 0 {
plog.Infof("sleeping delay duration %v for %q", f.delayDuration, f.failure.Desc())
time.Sleep(f.delayDuration)
}
return nil
}

View File

@ -24,6 +24,9 @@ const (
slowNetworkLatency = 500 // 500 millisecond
randomVariation = 50
// delay duration to trigger leader election (default election timeout 1s)
triggerElectionDur = 5 * time.Second
// Wait more when it recovers from slow network, because network layer
// needs extra time to propagate traffic control (tc command) change.
// Otherwise, we get different hash values from the previous revision.
@ -82,19 +85,27 @@ func injectDropPort(m *member) error { return m.Agent.DropPort(m.peerPort()) }
func recoverDropPort(m *member) error { return m.Agent.RecoverPort(m.peerPort()) }
func newFailureIsolate() failure {
return &failureOne{
f := &failureOne{
description: "isolate one member",
injectMember: injectDropPort,
recoverMember: recoverDropPort,
}
return &failureDelay{
failure: f,
delayDuration: triggerElectionDur,
}
}
func newFailureIsolateAll() failure {
return &failureAll{
f := &failureAll{
description: "isolate all members",
injectMember: injectDropPort,
recoverMember: recoverDropPort,
}
return &failureDelay{
failure: f,
delayDuration: triggerElectionDur,
}
}
func injectLatency(m *member) error {
@ -115,11 +126,15 @@ func recoverLatency(m *member) error {
func newFailureSlowNetworkOneMember() failure {
desc := fmt.Sprintf("slow down one member's network by adding %d ms latency", slowNetworkLatency)
return &failureOne{
f := &failureOne{
description: description(desc),
injectMember: injectLatency,
recoverMember: recoverLatency,
}
return &failureDelay{
failure: f,
delayDuration: triggerElectionDur,
}
}
func newFailureSlowNetworkLeader() failure {
@ -129,15 +144,23 @@ func newFailureSlowNetworkLeader() failure {
injectMember: injectLatency,
recoverMember: recoverLatency,
}
return &failureLeader{ff, 0}
f := &failureLeader{ff, 0}
return &failureDelay{
failure: f,
delayDuration: triggerElectionDur,
}
}
func newFailureSlowNetworkAll() failure {
return &failureAll{
f := &failureAll{
description: "slow down all members' network",
injectMember: injectLatency,
recoverMember: recoverLatency,
}
return &failureDelay{
failure: f,
delayDuration: triggerElectionDur,
}
}
func newFailureNop() failure {

View File

@ -41,7 +41,9 @@ const (
func main() {
endpointStr := flag.String("agent-endpoints", "localhost:9027", "HTTP RPC endpoints of agents. Do not specify the schema.")
clientPorts := flag.String("client-ports", "", "etcd client port for each agent endpoint")
advertiseClientPorts := flag.String("advertise-client-ports", "", "etcd advertise client port for each agent endpoint")
peerPorts := flag.String("peer-ports", "", "etcd peer port for each agent endpoint")
advertisePeerPorts := flag.String("advertise-peer-ports", "", "etcd advertise peer port for each agent endpoint")
failpointPorts := flag.String("failpoint-ports", "", "etcd failpoint port for each agent endpoint")
stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.")
@ -67,14 +69,18 @@ func main() {
eps := strings.Split(*endpointStr, ",")
cports := portsFromArg(*clientPorts, len(eps), defaultClientPort)
acports := portsFromArg(*advertiseClientPorts, len(eps), defaultClientPort)
pports := portsFromArg(*peerPorts, len(eps), defaultPeerPort)
apports := portsFromArg(*advertisePeerPorts, len(eps), defaultPeerPort)
fports := portsFromArg(*failpointPorts, len(eps), defaultFailpointPort)
agents := make([]agentConfig, len(eps))
for i := range eps {
agents[i].endpoint = eps[i]
agents[i].clientPort = cports[i]
agents[i].advertiseClientPort = acports[i]
agents[i].peerPort = pports[i]
agents[i].advertisePeerPort = apports[i]
agents[i].failpointPort = fports[i]
}

View File

@ -29,23 +29,25 @@ import (
)
type member struct {
Agent client.Agent
Endpoint string
Name string
ClientURL string
PeerURL string
FailpointURL string
Agent client.Agent
Endpoint string
Name string
ClientURL string
AdvertiseClientURL string
PeerURL string
AdvertisePeerURL string
FailpointURL string
}
func (m *member) ClusterEntry() string { return m.Name + "=" + m.PeerURL }
func (m *member) ClusterEntry() string { return m.Name + "=" + m.AdvertisePeerURL }
func (m *member) Flags() []string {
return []string{
"--name", m.Name,
"--listen-client-urls", m.ClientURL,
"--advertise-client-urls", m.ClientURL,
"--advertise-client-urls", m.AdvertiseClientURL,
"--listen-peer-urls", m.PeerURL,
"--initial-advertise-peer-urls", m.PeerURL,
"--initial-advertise-peer-urls", m.AdvertisePeerURL,
"--initial-cluster-state", "new",
"--experimental-initial-corrupt-check",
}
@ -54,7 +56,7 @@ func (m *member) Flags() []string {
func (m *member) CheckCompact(rev int64) error {
cli, err := m.newClientV3()
if err != nil {
return fmt.Errorf("%v (endpoint %s)", err, m.ClientURL)
return fmt.Errorf("%v (endpoint %s)", err, m.AdvertiseClientURL)
}
defer cli.Close()
@ -64,29 +66,29 @@ func (m *member) CheckCompact(rev int64) error {
cancel()
if !ok {
return fmt.Errorf("watch channel terminated (endpoint %s)", m.ClientURL)
return fmt.Errorf("watch channel terminated (endpoint %s)", m.AdvertiseClientURL)
}
if wr.CompactRevision != rev {
return fmt.Errorf("got compact revision %v, wanted %v (endpoint %s)", wr.CompactRevision, rev, m.ClientURL)
return fmt.Errorf("got compact revision %v, wanted %v (endpoint %s)", wr.CompactRevision, rev, m.AdvertiseClientURL)
}
return nil
}
func (m *member) Defrag() error {
plog.Printf("defragmenting %s\n", m.ClientURL)
plog.Printf("defragmenting %s\n", m.AdvertiseClientURL)
cli, err := m.newClientV3()
if err != nil {
return err
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
_, err = cli.Defragment(ctx, m.ClientURL)
_, err = cli.Defragment(ctx, m.AdvertiseClientURL)
cancel()
if err != nil {
return err
}
plog.Printf("defragmented %s\n", m.ClientURL)
plog.Printf("defragmented %s\n", m.AdvertiseClientURL)
return nil
}
@ -114,7 +116,7 @@ func (m *member) Rev(ctx context.Context) (int64, error) {
return 0, err
}
defer cli.Close()
resp, err := cli.Status(ctx, m.ClientURL)
resp, err := cli.Status(ctx, m.AdvertiseClientURL)
if err != nil {
return 0, err
}
@ -127,7 +129,7 @@ func (m *member) IsLeader() (bool, error) {
return false, err
}
defer cli.Close()
resp, err := cli.Status(context.Background(), m.ClientURL)
resp, err := cli.Status(context.Background(), m.AdvertiseClientURL)
if err != nil {
return false, err
}
@ -137,7 +139,7 @@ func (m *member) IsLeader() (bool, error) {
func (m *member) SetHealthKeyV3() error {
cli, err := m.newClientV3()
if err != nil {
return fmt.Errorf("%v (%s)", err, m.ClientURL)
return fmt.Errorf("%v (%s)", err, m.AdvertiseClientURL)
}
defer cli.Close()
// give enough time-out in case expensive requests (range/delete) are pending
@ -145,14 +147,14 @@ func (m *member) SetHealthKeyV3() error {
_, err = cli.Put(ctx, "health", "good")
cancel()
if err != nil {
return fmt.Errorf("%v (%s)", err, m.ClientURL)
return fmt.Errorf("%v (%s)", err, m.AdvertiseClientURL)
}
return nil
}
func (m *member) newClientV3() (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
Endpoints: []string{m.ClientURL},
Endpoints: []string{m.AdvertiseClientURL},
DialTimeout: 5 * time.Second,
})
}
@ -163,7 +165,7 @@ func (m *member) dialGRPC() (*grpc.ClientConn, error) {
// grpcAddr gets the host from clientURL so it works with grpc.Dial()
func (m *member) grpcAddr() string {
u, err := url.Parse(m.ClientURL)
u, err := url.Parse(m.AdvertiseClientURL)
if err != nil {
panic(err)
}
@ -171,7 +173,7 @@ func (m *member) grpcAddr() string {
}
func (m *member) peerPort() (port int) {
u, err := url.Parse(m.PeerURL)
u, err := url.Parse(m.AdvertisePeerURL)
if err != nil {
panic(err)
}