From b4d311d6a1a04c70803c04f228c99946a02ec12b Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 16 Oct 2013 13:52:35 -0600 Subject: [PATCH] Reintegrate functional tests into etcd. --- .travis.yml | 3 - test.sh | 10 +- tests/functional/etcd_direct_call.go | 34 +++ tests/functional/init.go | 17 ++ tests/functional/internal_version_test.go | 57 +++++ tests/functional/kill_leader_test.go | 63 ++++++ tests/functional/kill_random_test.go | 76 +++++++ .../multi_node_kill_all_and_recovery_test.go | 72 ++++++ tests/functional/multi_node_kill_one_test.go | 58 +++++ tests/functional/remove_node_test.go | 113 ++++++++++ tests/functional/simple_multi_node_test.go | 62 ++++++ tests/functional/single_node_recovery_test.go | 68 ++++++ tests/functional/single_node_test.go | 77 +++++++ tests/functional/util.go | 205 ++++++++++++++++++ 14 files changed, 906 insertions(+), 9 deletions(-) create mode 100644 tests/functional/etcd_direct_call.go create mode 100644 tests/functional/init.go create mode 100644 tests/functional/internal_version_test.go create mode 100644 tests/functional/kill_leader_test.go create mode 100644 tests/functional/kill_random_test.go create mode 100644 tests/functional/multi_node_kill_all_and_recovery_test.go create mode 100644 tests/functional/multi_node_kill_one_test.go create mode 100644 tests/functional/remove_node_test.go create mode 100644 tests/functional/simple_multi_node_test.go create mode 100644 tests/functional/single_node_recovery_test.go create mode 100644 tests/functional/single_node_test.go create mode 100644 tests/functional/util.go diff --git a/.travis.yml b/.travis.yml index 1c07835fd..708ec2d50 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,5 @@ language: go go: 1.1 -install: - - go get -u github.com/coreos/etcd-test-runner - script: - ./test.sh diff --git a/test.sh b/test.sh index c40f0c7e1..9695e70c2 100755 --- a/test.sh +++ b/test.sh @@ -3,10 +3,8 @@ # Get GOPATH, etc from build . ./build -# Run the tests! -go test -i -go test -v +# Unit tests +go test -v ./store -# Run the functional tests! -go test -i github.com/coreos/etcd-test-runner -ETCD_BIN_PATH=$(pwd)/etcd go test -v github.com/coreos/etcd-test-runner +# Functional tests +ETCD_BIN_PATH=$(pwd)/etcd go test -v ./tests/functional diff --git a/tests/functional/etcd_direct_call.go b/tests/functional/etcd_direct_call.go new file mode 100644 index 000000000..6f629b4fd --- /dev/null +++ b/tests/functional/etcd_direct_call.go @@ -0,0 +1,34 @@ +package test + +import ( + "net/http" + "os" + "testing" + "time" +) + +func BenchmarkEtcdDirectCall(b *testing.B) { + templateBenchmarkEtcdDirectCall(b, false) +} + +func BenchmarkEtcdDirectCallTls(b *testing.B) { + templateBenchmarkEtcdDirectCall(b, true) +} + +func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 3 + _, etcds, _ := CreateCluster(clusterSize, procAttr, tls) + + defer DestroyCluster(etcds) + + time.Sleep(time.Second) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + resp, _ := http.Get("http://127.0.0.1:4001/test/speed") + resp.Body.Close() + } +} diff --git a/tests/functional/init.go b/tests/functional/init.go new file mode 100644 index 000000000..48e4284b1 --- /dev/null +++ b/tests/functional/init.go @@ -0,0 +1,17 @@ +package test + +import ( + "go/build" + "os" + "path/filepath" +) + +var EtcdBinPath string + +func init() { + // Initialize the 'etcd' binary path or default it to the etcd diretory. + EtcdBinPath = os.Getenv("ETCD_BIN_PATH") + if EtcdBinPath == "" { + EtcdBinPath = filepath.Join(build.Default.GOPATH, "src", "github.com", "coreos", "etcd", "etcd") + } +} diff --git a/tests/functional/internal_version_test.go b/tests/functional/internal_version_test.go new file mode 100644 index 000000000..3413928c1 --- /dev/null +++ b/tests/functional/internal_version_test.go @@ -0,0 +1,57 @@ +package test + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "os" + "testing" + "time" +) + +// Ensure that etcd does not come up if the internal raft versions do not match. +func TestInternalVersion(t *testing.T) { + checkedVersion := false + testMux := http.NewServeMux() + + testMux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "This is not a version number") + checkedVersion = true + }) + + testMux.HandleFunc("/join", func(w http.ResponseWriter, r *http.Request) { + t.Fatal("should not attempt to join!") + }) + + ts := httptest.NewServer(testMux) + defer ts.Close() + + fakeURL, _ := url.Parse(ts.URL) + + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/node1", "-C=" + fakeURL.Host} + + process, err := os.StartProcess(EtcdBinPath, args, procAttr) + if err != nil { + t.Fatal("start process failed:" + err.Error()) + return + } + defer process.Kill() + + time.Sleep(time.Second) + + _, err = http.Get("http://127.0.0.1:4001") + + if err == nil { + t.Fatal("etcd node should not be up") + return + } + + if checkedVersion == false { + t.Fatal("etcd did not check the version") + return + } +} + diff --git a/tests/functional/kill_leader_test.go b/tests/functional/kill_leader_test.go new file mode 100644 index 000000000..ad9113695 --- /dev/null +++ b/tests/functional/kill_leader_test.go @@ -0,0 +1,63 @@ +package test + +import ( + "fmt" + "os" + "strconv" + "strings" + "testing" + "time" +) + +// This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times. +// It will print out the election time and the average election time. +func TestKillLeader(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 5 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + if err != nil { + t.Fatal("cannot create cluster") + } + defer DestroyCluster(etcds) + + stop := make(chan bool) + leaderChan := make(chan string, 1) + all := make(chan bool, 1) + + time.Sleep(time.Second) + + go Monitor(clusterSize, 1, leaderChan, all, stop) + + var totalTime time.Duration + + leader := "http://127.0.0.1:7001" + + for i := 0; i < clusterSize; i++ { + fmt.Println("leader is ", leader) + port, _ := strconv.Atoi(strings.Split(leader, ":")[2]) + num := port - 7001 + fmt.Println("kill server ", num) + etcds[num].Kill() + etcds[num].Release() + + start := time.Now() + for { + newLeader := <-leaderChan + if newLeader != leader { + leader = newLeader + break + } + } + take := time.Now().Sub(start) + + totalTime += take + avgTime := totalTime / (time.Duration)(i+1) + fmt.Println("Total time:", totalTime, "; Avg time:", avgTime) + + etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr) + } + stop <- true +} + diff --git a/tests/functional/kill_random_test.go b/tests/functional/kill_random_test.go new file mode 100644 index 000000000..4fc3c1171 --- /dev/null +++ b/tests/functional/kill_random_test.go @@ -0,0 +1,76 @@ +package test + +import ( + "fmt" + "math/rand" + "os" + "testing" + "time" +) + +// TestKillRandom kills random machines in the cluster and +// restart them after all other machines agree on the same leader +func TestKillRandom(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 9 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + + if err != nil { + t.Fatal("cannot create cluster") + } + + defer DestroyCluster(etcds) + + stop := make(chan bool) + leaderChan := make(chan string, 1) + all := make(chan bool, 1) + + time.Sleep(3 * time.Second) + + go Monitor(clusterSize, 4, leaderChan, all, stop) + + toKill := make(map[int]bool) + + for i := 0; i < 20; i++ { + fmt.Printf("TestKillRandom Round[%d/20]\n", i) + + j := 0 + for { + + r := rand.Int31n(9) + if _, ok := toKill[int(r)]; !ok { + j++ + toKill[int(r)] = true + } + + if j > 3 { + break + } + + } + + for num, _ := range toKill { + err := etcds[num].Kill() + if err != nil { + panic(err) + } + etcds[num].Wait() + } + + time.Sleep(1 * time.Second) + + <-leaderChan + + for num, _ := range toKill { + etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr) + } + + toKill = make(map[int]bool) + <-all + } + + stop <- true +} + diff --git a/tests/functional/multi_node_kill_all_and_recovery_test.go b/tests/functional/multi_node_kill_all_and_recovery_test.go new file mode 100644 index 000000000..454b9648c --- /dev/null +++ b/tests/functional/multi_node_kill_all_and_recovery_test.go @@ -0,0 +1,72 @@ +package test + +import ( + "os" + "testing" + "time" + + "github.com/coreos/go-etcd/etcd" +) + +// Create a five nodes +// Kill all the nodes and restart +func TestMultiNodeKillAllAndRecovery(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 5 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + defer DestroyCluster(etcds) + + if err != nil { + t.Fatal("cannot create cluster") + } + + c := etcd.NewClient(nil) + + c.SyncCluster() + + time.Sleep(time.Second) + + // send 10 commands + for i := 0; i < 10; i++ { + // Test Set + _, err := c.Set("foo", "bar", 0) + if err != nil { + panic(err) + } + } + + time.Sleep(time.Second) + + // kill all + DestroyCluster(etcds) + + time.Sleep(time.Second) + + stop := make(chan bool) + leaderChan := make(chan string, 1) + all := make(chan bool, 1) + + time.Sleep(time.Second) + + for i := 0; i < clusterSize; i++ { + etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr) + } + + go Monitor(clusterSize, 1, leaderChan, all, stop) + + <-all + <-leaderChan + + result, err := c.Set("foo", "bar", 0) + + if err != nil { + t.Fatalf("Recovery error: %s", err) + } + + if result.Index != 18 { + t.Fatalf("recovery failed! [%d/18]", result.Index) + } +} + diff --git a/tests/functional/multi_node_kill_one_test.go b/tests/functional/multi_node_kill_one_test.go new file mode 100644 index 000000000..725d54b6c --- /dev/null +++ b/tests/functional/multi_node_kill_one_test.go @@ -0,0 +1,58 @@ +package test + +import ( + "fmt" + "math/rand" + "os" + "testing" + "time" + + "github.com/coreos/go-etcd/etcd" +) + +// Create a five nodes +// Randomly kill one of the node and keep on sending set command to the cluster +func TestMultiNodeKillOne(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 5 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + + if err != nil { + t.Fatal("cannot create cluster") + } + + defer DestroyCluster(etcds) + + time.Sleep(2 * time.Second) + + c := etcd.NewClient(nil) + + c.SyncCluster() + + stop := make(chan bool) + // Test Set + go Set(stop) + + for i := 0; i < 10; i++ { + num := rand.Int() % clusterSize + fmt.Println("kill node", num+1) + + // kill + etcds[num].Kill() + etcds[num].Release() + time.Sleep(time.Second) + + // restart + etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr) + if err != nil { + panic(err) + } + time.Sleep(time.Second) + } + fmt.Println("stop") + stop <- true + <-stop +} + diff --git a/tests/functional/remove_node_test.go b/tests/functional/remove_node_test.go new file mode 100644 index 000000000..09fa747aa --- /dev/null +++ b/tests/functional/remove_node_test.go @@ -0,0 +1,113 @@ +package test + +import ( + "net/http" + "os" + "testing" + "time" + + "github.com/coreos/go-etcd/etcd" +) + +// remove the node and node rejoin with previous log +func TestRemoveNode(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 3 + argGroup, etcds, _ := CreateCluster(clusterSize, procAttr, false) + defer DestroyCluster(etcds) + + time.Sleep(time.Second) + + c := etcd.NewClient(nil) + + c.SyncCluster() + + rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/remove/node3", nil) + + client := &http.Client{} + for i := 0; i < 2; i++ { + for i := 0; i < 2; i++ { + client.Do(rmReq) + + etcds[2].Wait() + + resp, err := c.Get("_etcd/machines") + + if err != nil { + panic(err) + } + + if len(resp) != 2 { + t.Fatal("cannot remove machine") + } + + if i == 1 { + // rejoin with log + etcds[2], err = os.StartProcess(EtcdBinPath, argGroup[2], procAttr) + } else { + // rejoin without log + etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2], "-f"), procAttr) + } + + if err != nil { + panic(err) + } + + time.Sleep(time.Second) + + resp, err = c.Get("_etcd/machines") + + if err != nil { + panic(err) + } + + if len(resp) != 3 { + t.Fatalf("add machine fails #1 (%d != 3)", len(resp)) + } + } + + // first kill the node, then remove it, then add it back + for i := 0; i < 2; i++ { + etcds[2].Kill() + etcds[2].Wait() + + client.Do(rmReq) + + resp, err := c.Get("_etcd/machines") + + if err != nil { + panic(err) + } + + if len(resp) != 2 { + t.Fatal("cannot remove machine") + } + + if i == 1 { + // rejoin with log + etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2]), procAttr) + } else { + // rejoin without log + etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2], "-f"), procAttr) + } + + if err != nil { + panic(err) + } + + time.Sleep(time.Second) + + resp, err = c.Get("_etcd/machines") + + if err != nil { + panic(err) + } + + if len(resp) != 3 { + t.Fatalf("add machine fails #2 (%d != 3)", len(resp)) + } + } + } +} diff --git a/tests/functional/simple_multi_node_test.go b/tests/functional/simple_multi_node_test.go new file mode 100644 index 000000000..5d93cbf5c --- /dev/null +++ b/tests/functional/simple_multi_node_test.go @@ -0,0 +1,62 @@ +package test + +import ( + "os" + "testing" + "time" + + "github.com/coreos/go-etcd/etcd" +) + +func TestSimpleMultiNode(t *testing.T) { + templateTestSimpleMultiNode(t, false) +} + +func TestSimpleMultiNodeTls(t *testing.T) { + templateTestSimpleMultiNode(t, true) +} + +// Create a three nodes and try to set value +func templateTestSimpleMultiNode(t *testing.T, tls bool) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 3 + + _, etcds, err := CreateCluster(clusterSize, procAttr, tls) + + if err != nil { + t.Fatal("cannot create cluster") + } + + defer DestroyCluster(etcds) + + time.Sleep(time.Second) + + c := etcd.NewClient(nil) + + c.SyncCluster() + + // Test Set + result, err := c.Set("foo", "bar", 100) + + if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 { + if err != nil { + t.Fatal(err) + } + + t.Fatalf("Set 1 failed with %s %s %v", result.Key, result.Value, result.TTL) + } + + time.Sleep(time.Second) + + result, err = c.Set("foo", "bar", 100) + + if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 100 { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL) + } + +} diff --git a/tests/functional/single_node_recovery_test.go b/tests/functional/single_node_recovery_test.go new file mode 100644 index 000000000..1b0d7f878 --- /dev/null +++ b/tests/functional/single_node_recovery_test.go @@ -0,0 +1,68 @@ +package test + +import ( + "os" + "testing" + "time" + + "github.com/coreos/go-etcd/etcd" +) + +// This test creates a single node and then set a value to it. +// Then this test kills the node and restart it and tries to get the value again. +func TestSingleNodeRecovery(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + args := []string{"etcd", "-n=node1", "-d=/tmp/node1"} + + process, err := os.StartProcess(EtcdBinPath, append(args, "-f"), procAttr) + if err != nil { + t.Fatal("start process failed:" + err.Error()) + return + } + + time.Sleep(time.Second) + + c := etcd.NewClient(nil) + + c.SyncCluster() + // Test Set + result, err := c.Set("foo", "bar", 100) + + if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 { + if err != nil { + t.Fatal(err) + } + + t.Fatalf("Set 1 failed with %s %s %v", result.Key, result.Value, result.TTL) + } + + time.Sleep(time.Second) + + process.Kill() + + process, err = os.StartProcess(EtcdBinPath, args, procAttr) + defer process.Kill() + if err != nil { + t.Fatal("start process failed:" + err.Error()) + return + } + + time.Sleep(time.Second) + + results, err := c.Get("foo") + if err != nil { + t.Fatal("get fail: " + err.Error()) + return + } + + result = results[0] + + if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL > 99 { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Recovery Get failed with %s %s %v", result.Key, result.Value, result.TTL) + } +} + diff --git a/tests/functional/single_node_test.go b/tests/functional/single_node_test.go new file mode 100644 index 000000000..04362e328 --- /dev/null +++ b/tests/functional/single_node_test.go @@ -0,0 +1,77 @@ +package test + +import ( + "os" + "testing" + "time" + + "github.com/coreos/go-etcd/etcd" +) + +// Create a single node and try to set value +func TestSingleNode(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/node1"} + + process, err := os.StartProcess(EtcdBinPath, args, procAttr) + if err != nil { + t.Fatal("start process failed:" + err.Error()) + return + } + defer process.Kill() + + time.Sleep(time.Second) + + c := etcd.NewClient(nil) + + c.SyncCluster() + // Test Set + result, err := c.Set("foo", "bar", 100) + + if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 { + if err != nil { + t.Fatal("Set 1: ", err) + } + + t.Fatalf("Set 1 failed with %s %s %v", result.Key, result.Value, result.TTL) + } + + time.Sleep(time.Second) + + result, err = c.Set("foo", "bar", 100) + + if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 100 { + if err != nil { + t.Fatal("Set 2: ", err) + } + t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL) + } + + // Add a test-and-set test + + // First, we'll test we can change the value if we get it write + result, match, err := c.TestAndSet("foo", "bar", "foobar", 100) + + if err != nil || result.Key != "/foo" || result.Value != "foobar" || result.PrevValue != "bar" || result.TTL != 100 || !match { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Set 3 failed with %s %s %v", result.Key, result.Value, result.TTL) + } + + // Next, we'll make sure we can't set it without the correct prior value + _, _, err = c.TestAndSet("foo", "bar", "foofoo", 100) + + if err == nil { + t.Fatalf("Set 4 expecting error when setting key with incorrect previous value") + } + + // Finally, we'll make sure a blank previous value still counts as a test-and-set and still has to match + _, _, err = c.TestAndSet("foo", "", "barbar", 100) + + if err == nil { + t.Fatalf("Set 5 expecting error when setting key with blank (incorrect) previous value") + } +} + diff --git a/tests/functional/util.go b/tests/functional/util.go new file mode 100644 index 000000000..981ff1be6 --- /dev/null +++ b/tests/functional/util.go @@ -0,0 +1,205 @@ +package test + +import ( + "fmt" + "github.com/coreos/go-etcd/etcd" + "io/ioutil" + "net" + "net/http" + "os" + "strconv" + "time" +) + +var client = http.Client{ + Transport: &http.Transport{ + Dial: dialTimeoutFast, + }, +} + +// Sending set commands +func Set(stop chan bool) { + + stopSet := false + i := 0 + c := etcd.NewClient(nil) + for { + key := fmt.Sprintf("%s_%v", "foo", i) + + result, err := c.Set(key, "bar", 0) + + if err != nil || result.Key != "/"+key || result.Value != "bar" { + select { + case <-stop: + stopSet = true + + default: + } + } + + select { + case <-stop: + stopSet = true + + default: + } + + if stopSet { + break + } + + i++ + } + stop <- true +} + +// Create a cluster of etcd nodes +func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os.Process, error) { + argGroup := make([][]string, size) + + sslServer1 := []string{"-serverCAFile=./fixtures/ca/ca.crt", + "-serverCert=./fixtures/ca/server.crt", + "-serverKey=./fixtures/ca/server.key.insecure", + } + + sslServer2 := []string{"-serverCAFile=./fixtures/ca/ca.crt", + "-serverCert=./fixtures/ca/server2.crt", + "-serverKey=./fixtures/ca/server2.key.insecure", + } + + for i := 0; i < size; i++ { + if i == 0 { + argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"} + if ssl { + argGroup[i] = append(argGroup[i], sslServer1...) + } + } else { + strI := strconv.Itoa(i + 1) + argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"} + if ssl { + argGroup[i] = append(argGroup[i], sslServer2...) + } + } + } + + etcds := make([]*os.Process, size) + + for i, _ := range etcds { + var err error + etcds[i], err = os.StartProcess(EtcdBinPath, append(argGroup[i], "-f"), procAttr) + if err != nil { + return nil, nil, err + } + + // TODOBP: Change this sleep to wait until the master is up. + // The problem is that if the master isn't up then the children + // have to retry. This retry can take upwards of 15 seconds + // which slows tests way down and some of them fail. + if i == 0 { + time.Sleep(time.Second * 2) + } + } + + return argGroup, etcds, nil +} + +// Destroy all the nodes in the cluster +func DestroyCluster(etcds []*os.Process) error { + for _, etcd := range etcds { + err := etcd.Kill() + if err != nil { + panic(err.Error()) + } + etcd.Release() + } + return nil +} + +// +func Monitor(size int, allowDeadNum int, leaderChan chan string, all chan bool, stop chan bool) { + leaderMap := make(map[int]string) + baseAddrFormat := "http://0.0.0.0:400%d" + + for { + knownLeader := "unknown" + dead := 0 + var i int + + for i = 0; i < size; i++ { + leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1)) + + if err == nil { + leaderMap[i] = leader + + if knownLeader == "unknown" { + knownLeader = leader + } else { + if leader != knownLeader { + break + } + + } + + } else { + dead++ + if dead > allowDeadNum { + break + } + } + + } + + if i == size { + select { + case <-stop: + return + case <-leaderChan: + leaderChan <- knownLeader + default: + leaderChan <- knownLeader + } + + } + if dead == 0 { + select { + case <-all: + all <- true + default: + all <- true + } + } + + time.Sleep(time.Millisecond * 10) + } + +} + +func getLeader(addr string) (string, error) { + + resp, err := client.Get(addr + "/v1/leader") + + if err != nil { + return "", err + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return "", fmt.Errorf("no leader") + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return "", err + } + + return string(b), nil + +} + +// Dial with timeout +func dialTimeoutFast(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, time.Millisecond*10) +}