From 00157ccdceff62f92a02554ad113eb9fa0137a91 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 30 Jul 2013 16:47:46 -0700 Subject: [PATCH] add killleader test and leader monitor func --- etcd_long_test.go | 62 ++++++++++++++++++ etcd_test.go | 79 +--------------------- test.go | 164 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 227 insertions(+), 78 deletions(-) create mode 100644 etcd_long_test.go create mode 100644 test.go diff --git a/etcd_long_test.go b/etcd_long_test.go new file mode 100644 index 000000000..258d60eaf --- /dev/null +++ b/etcd_long_test.go @@ -0,0 +1,62 @@ +package main + +import ( + "fmt" + "os" + "strconv" + "strings" + "testing" + "time" +) + +// This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times. +// It will print out the election time and the average election time. +func TestKillLeader(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 5 + argGroup, etcds, err := createCluster(clusterSize, procAttr) + + if err != nil { + t.Fatal("cannot create cluster") + } + + defer destroyCluster(etcds) + + leaderChan := make(chan string, 1) + + time.Sleep(time.Second) + + go leaderMonitor(clusterSize, 1, leaderChan) + + var totalTime time.Duration + + leader := "0.0.0.0:7001" + + for i := 0; i < 200; i++ { + port, _ := strconv.Atoi(strings.Split(leader, ":")[1]) + num := port - 7001 + fmt.Println("kill server ", num) + etcds[num].Kill() + etcds[num].Release() + + start := time.Now() + for { + newLeader := <-leaderChan + if newLeader != leader { + leader = newLeader + break + } + } + take := time.Now().Sub(start) + + totalTime += take + avgTime := totalTime / (time.Duration)(i+1) + + fmt.Println("Leader election time is ", take, "with election timeout", ELECTIONTIMTOUT) + fmt.Println("Leader election time average is", avgTime, "with election timeout", ELECTIONTIMTOUT) + etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr) + } + +} diff --git a/etcd_test.go b/etcd_test.go index 6d1e431c7..51721dea7 100644 --- a/etcd_test.go +++ b/etcd_test.go @@ -5,7 +5,7 @@ import ( "github.com/coreos/go-etcd/etcd" "math/rand" "os" - "strconv" + //"strconv" "testing" "time" ) @@ -191,80 +191,3 @@ func TestMultiNodeRecovery(t *testing.T) { stop <- true <-stop } - -// Sending set commands -func set(stop chan bool) { - - stopSet := false - i := 0 - - for { - key := fmt.Sprintf("%s_%v", "foo", i) - - result, err := etcd.Set(key, "bar", 0) - - if err != nil || result.Key != "/"+key || result.Value != "bar" { - select { - case <-stop: - stopSet = true - - default: - fmt.Println("Set failed!") - return - } - } - - select { - case <-stop: - stopSet = true - - default: - } - - if stopSet { - break - } - - i++ - } - fmt.Println("set stop") - stop <- true -} - -// Create a cluster of etcd nodes -func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, error) { - argGroup := make([][]string, size) - for i := 0; i < size; i++ { - if i == 0 { - argGroup[i] = []string{"etcd", "-d=/tmp/node1"} - } else { - strI := strconv.Itoa(i + 1) - argGroup[i] = []string{"etcd", "-c=400" + strI, "-s=700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"} - } - } - - etcds := make([]*os.Process, size) - - for i, _ := range etcds { - var err error - etcds[i], err = os.StartProcess("etcd", append(argGroup[i], "-i"), procAttr) - if err != nil { - return nil, nil, err - } - } - - return argGroup, etcds, nil -} - -// Destroy all the nodes in the cluster -func destroyCluster(etcds []*os.Process) error { - for i, etcd := range etcds { - err := etcd.Kill() - fmt.Println("kill ", i) - if err != nil { - panic(err.Error()) - } - etcd.Release() - } - return nil -} diff --git a/test.go b/test.go new file mode 100644 index 000000000..43ad70e50 --- /dev/null +++ b/test.go @@ -0,0 +1,164 @@ +package main + +import ( + "fmt" + "github.com/coreos/go-etcd/etcd" + "io/ioutil" + "net" + "net/http" + "os" + "strconv" + "time" +) + +var client = http.Client{Transport: &http.Transport{ + Dial: dialTimeoutFast, +}, +} + +// Sending set commands +func set(stop chan bool) { + + stopSet := false + i := 0 + + for { + key := fmt.Sprintf("%s_%v", "foo", i) + + result, err := etcd.Set(key, "bar", 0) + + if err != nil || result.Key != "/"+key || result.Value != "bar" { + select { + case <-stop: + stopSet = true + + default: + fmt.Println("Set failed!") + return + } + } + + select { + case <-stop: + stopSet = true + + default: + } + + if stopSet { + break + } + + i++ + } + fmt.Println("set stop") + stop <- true +} + +// Create a cluster of etcd nodes +func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, error) { + argGroup := make([][]string, size) + for i := 0; i < size; i++ { + if i == 0 { + argGroup[i] = []string{"etcd", "-d=/tmp/node1"} + } else { + strI := strconv.Itoa(i + 1) + argGroup[i] = []string{"etcd", "-c=400" + strI, "-s=700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"} + } + } + + etcds := make([]*os.Process, size) + + for i, _ := range etcds { + var err error + etcds[i], err = os.StartProcess("etcd", append(argGroup[i], "-i"), procAttr) + if err != nil { + return nil, nil, err + } + } + + return argGroup, etcds, nil +} + +// Destroy all the nodes in the cluster +func destroyCluster(etcds []*os.Process) error { + for i, etcd := range etcds { + err := etcd.Kill() + fmt.Println("kill ", i) + if err != nil { + panic(err.Error()) + } + etcd.Release() + } + return nil +} + +// +func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) { + leaderMap := make(map[int]string) + baseAddrFormat := "http://0.0.0.0:400%d/leader" + for { + knownLeader := "unknown" + dead := 0 + var i int + for i = 0; i < size; i++ { + leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1)) + if err == nil { + leaderMap[i] = leader + + if knownLeader == "unknown" { + knownLeader = leader + } else { + if leader != knownLeader { + break + } + } + } else { + dead++ + if dead > allowDeadNum { + break + } + } + } + if i == size { + select { + case <-leaderChan: + leaderChan <- knownLeader + default: + leaderChan <- knownLeader + } + + } + time.Sleep(time.Millisecond * 10) + } +} + +func getLeader(addr string) (string, error) { + + resp, err := client.Get(addr) + + if err != nil { + return "", err + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return "", fmt.Errorf("no leader") + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return "", err + } + + return string(b), nil + +} + +// Dial with timeout +func dialTimeoutFast(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, time.Millisecond*10) +}