mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: remove unused pkgs
This commit is contained in:
@@ -1,93 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/coreos/etcd/server"
|
||||
etcdtest "github.com/coreos/etcd/tests"
|
||||
goetcd "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
type garbageHandler struct {
|
||||
t *testing.T
|
||||
success bool
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func (g *garbageHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprintln(w, "Hello, client")
|
||||
if r.URL.String() != "/v2/keys/_etcd/registry/1/node1" {
|
||||
g.t.Fatalf("Unexpected web request")
|
||||
}
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
|
||||
g.success = true
|
||||
}
|
||||
|
||||
// TestDiscoverySecondPeerFirstNoResponse ensures that if the first etcd
|
||||
// machine stops after heartbeating that the second machine fails too.
|
||||
func TestDiscoverySecondPeerFirstNoResponse(t *testing.T) {
|
||||
etcdtest.RunServer(func(s *server.Server) {
|
||||
v := url.Values{}
|
||||
v.Set("value", "started")
|
||||
resp, err := etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/2/_state"), v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
|
||||
v = url.Values{}
|
||||
v.Set("value", "http://127.0.0.1:49151")
|
||||
resp, err = etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/2/ETCDTEST"), v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
|
||||
proc, err := startServer([]string{"-retry-interval", "0.2", "-discovery", s.URL() + "/v2/keys/_etcd/registry/2"})
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
defer stopServer(proc)
|
||||
|
||||
// TODO(bp): etcd will take 30 seconds to shutdown, figure this
|
||||
// out instead
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
client := http.Client{}
|
||||
_, err = client.Get("/")
|
||||
if err != nil && strings.Contains(err.Error(), "connection reset by peer") {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func assertServerNotUp(client http.Client, scheme string) error {
|
||||
path := fmt.Sprintf("%s://127.0.0.1:4001/v2/keys/foo", scheme)
|
||||
fields := url.Values(map[string][]string{"value": {"bar"}})
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
_, err := client.PostForm(path, fields)
|
||||
if err == nil {
|
||||
return errors.New("Expected error during POST, got nil")
|
||||
} else {
|
||||
errString := err.Error()
|
||||
if strings.Contains(errString, "connection refused") {
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,36 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func BenchmarkEtcdDirectCall(b *testing.B) {
|
||||
templateBenchmarkEtcdDirectCall(b, false)
|
||||
}
|
||||
|
||||
func BenchmarkEtcdDirectCallTls(b *testing.B) {
|
||||
templateBenchmarkEtcdDirectCall(b, true)
|
||||
}
|
||||
|
||||
func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
clusterSize := 3
|
||||
_, etcds, _ := CreateCluster(clusterSize, procAttr, tls)
|
||||
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
resp, _ := http.Get("http://127.0.0.1:4001/test/speed")
|
||||
resp.Body.Close()
|
||||
}
|
||||
}
|
||||
@@ -1,270 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestTLSOff asserts that non-TLS-encrypted communication between the
|
||||
// etcd server and an unauthenticated client works
|
||||
func TestTLSOff(t *testing.T) {
|
||||
proc, err := startServer([]string{})
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
defer stopServer(proc)
|
||||
|
||||
client := buildClient()
|
||||
err = assertServerFunctional(client, "http")
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// TestTLSAnonymousClient asserts that TLS-encrypted communication between the etcd
|
||||
// server and an anonymous client works
|
||||
func TestTLSAnonymousClient(t *testing.T) {
|
||||
proc, err := startServer([]string{
|
||||
"-cert-file=../../fixtures/ca/server.crt",
|
||||
"-key-file=../../fixtures/ca/server.key.insecure",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
defer stopServer(proc)
|
||||
|
||||
cacertfile := "../../fixtures/ca/ca.crt"
|
||||
|
||||
cp := x509.NewCertPool()
|
||||
bytes, err := ioutil.ReadFile(cacertfile)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
cp.AppendCertsFromPEM(bytes)
|
||||
|
||||
cfg := tls.Config{}
|
||||
cfg.RootCAs = cp
|
||||
|
||||
client := buildTLSClient(&cfg)
|
||||
err = assertServerFunctional(client, "https")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTLSAuthenticatedClient asserts that TLS-encrypted communication
|
||||
// between the etcd server and an authenticated client works
|
||||
func TestTLSAuthenticatedClient(t *testing.T) {
|
||||
proc, err := startServer([]string{
|
||||
"-cert-file=../../fixtures/ca/server.crt",
|
||||
"-key-file=../../fixtures/ca/server.key.insecure",
|
||||
"-ca-file=../../fixtures/ca/ca.crt",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
defer stopServer(proc)
|
||||
|
||||
cacertfile := "../../fixtures/ca/ca.crt"
|
||||
certfile := "../../fixtures/ca/server2.crt"
|
||||
keyfile := "../../fixtures/ca/server2.key.insecure"
|
||||
|
||||
cert, err := tls.LoadX509KeyPair(certfile, keyfile)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
cp := x509.NewCertPool()
|
||||
bytes, err := ioutil.ReadFile(cacertfile)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
cp.AppendCertsFromPEM(bytes)
|
||||
|
||||
cfg := tls.Config{}
|
||||
cfg.Certificates = []tls.Certificate{cert}
|
||||
cfg.RootCAs = cp
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
client := buildTLSClient(&cfg)
|
||||
err = assertServerFunctional(client, "https")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTLSUnathenticatedClient asserts that TLS-encrypted communication
|
||||
// between the etcd server and an unauthenticated client fails
|
||||
func TestTLSUnauthenticatedClient(t *testing.T) {
|
||||
proc, err := startServer([]string{
|
||||
"-cert-file=../../fixtures/ca/server.crt",
|
||||
"-key-file=../../fixtures/ca/server.key.insecure",
|
||||
"-ca-file=../../fixtures/ca/ca.crt",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
defer stopServer(proc)
|
||||
|
||||
cacertfile := "../../fixtures/ca/ca.crt"
|
||||
certfile := "../../fixtures/ca/broken_server.crt"
|
||||
keyfile := "../../fixtures/ca/broken_server.key.insecure"
|
||||
|
||||
cert, err := tls.LoadX509KeyPair(certfile, keyfile)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
cp := x509.NewCertPool()
|
||||
bytes, err := ioutil.ReadFile(cacertfile)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
cp.AppendCertsFromPEM(bytes)
|
||||
|
||||
cfg := tls.Config{}
|
||||
cfg.Certificates = []tls.Certificate{cert}
|
||||
cfg.RootCAs = cp
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
client := buildTLSClient(&cfg)
|
||||
err = assertServerNotFunctional(client, "https")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func buildClient() http.Client {
|
||||
return http.Client{}
|
||||
}
|
||||
|
||||
func buildTLSClient(tlsConf *tls.Config) http.Client {
|
||||
tr := http.Transport{TLSClientConfig: tlsConf}
|
||||
return http.Client{Transport: &tr}
|
||||
}
|
||||
|
||||
func startServer(extra []string) (*os.Process, error) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
cmd := []string{"etcd", "-f", "-data-dir=/tmp/node1", "-name=node1"}
|
||||
cmd = append(cmd, extra...)
|
||||
|
||||
println(strings.Join(cmd, " "))
|
||||
|
||||
return os.StartProcess(EtcdBinPath, cmd, procAttr)
|
||||
}
|
||||
|
||||
// TODO(yichengq): refactor these helper functions in #645
|
||||
func startServer2(extra []string) (*os.Process, error) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
cmd := []string{"etcd", "-f", "-data-dir=/tmp/node2", "-name=node2"}
|
||||
cmd = append(cmd, extra...)
|
||||
|
||||
fmt.Println(strings.Join(cmd, " "))
|
||||
|
||||
return os.StartProcess(EtcdBinPath, cmd, procAttr)
|
||||
}
|
||||
|
||||
func startServerWithDataDir(extra []string) (*os.Process, error) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
cmd := []string{"etcd", "-data-dir=/tmp/node1", "-name=node1"}
|
||||
cmd = append(cmd, extra...)
|
||||
|
||||
fmt.Println(strings.Join(cmd, " "))
|
||||
|
||||
return os.StartProcess(EtcdBinPath, cmd, procAttr)
|
||||
}
|
||||
|
||||
func startServer2WithDataDir(extra []string) (*os.Process, error) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
cmd := []string{"etcd", "-data-dir=/tmp/node2", "-name=node2"}
|
||||
cmd = append(cmd, extra...)
|
||||
|
||||
println(strings.Join(cmd, " "))
|
||||
|
||||
return os.StartProcess(EtcdBinPath, cmd, procAttr)
|
||||
}
|
||||
|
||||
func stopServer(proc *os.Process) {
|
||||
err := proc.Kill()
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
proc.Release()
|
||||
}
|
||||
|
||||
func assertServerFunctional(client http.Client, scheme string) error {
|
||||
path := fmt.Sprintf("%s://127.0.0.1:4001/v2/keys/foo", scheme)
|
||||
fields := url.Values(map[string][]string{"value": {"bar"}})
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
resp, err := client.PostForm(path, fields)
|
||||
// If the status is Temporary Redirect, we should follow the
|
||||
// new location, because the request did not go to the leader yet.
|
||||
// TODO(yichengq): the difference between Temporary Redirect(307)
|
||||
// and Created(201) could distinguish between leader and followers
|
||||
for err == nil && resp.StatusCode == http.StatusTemporaryRedirect {
|
||||
loc, _ := resp.Location()
|
||||
newPath := loc.String()
|
||||
resp, err = client.PostForm(newPath, fields)
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
// Internal error may mean that servers are in leader election
|
||||
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusInternalServerError {
|
||||
return errors.New(fmt.Sprintf("resp.StatusCode == %s", resp.Status))
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return errors.New("etcd server was not reachable in time / had internal error")
|
||||
}
|
||||
|
||||
func assertServerNotFunctional(client http.Client, scheme string) error {
|
||||
path := fmt.Sprintf("%s://127.0.0.1:4001/v2/keys/foo", scheme)
|
||||
fields := url.Values(map[string][]string{"value": {"bar"}})
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
_, err := client.PostForm(path, fields)
|
||||
if err == nil {
|
||||
return errors.New("Expected error during POST, got nil")
|
||||
} else {
|
||||
errString := err.Error()
|
||||
if strings.Contains(errString, "connection refused") {
|
||||
continue
|
||||
} else if strings.Contains(errString, "bad certificate") {
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return errors.New("Expected server to fail with 'bad certificate'")
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"go/build"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
var EtcdBinPath string
|
||||
|
||||
func init() {
|
||||
// Initialize the 'etcd' binary path or default it to the etcd diretory.
|
||||
EtcdBinPath = os.Getenv("ETCD_BIN_PATH")
|
||||
if EtcdBinPath == "" {
|
||||
EtcdBinPath = filepath.Join(build.Default.GOPATH, "src", "github.com", "coreos", "etcd", "etcd")
|
||||
}
|
||||
}
|
||||
@@ -1,65 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Ensure that etcd does not come up if the internal raft versions do not match.
|
||||
func TestInternalVersion(t *testing.T) {
|
||||
var mu sync.Mutex
|
||||
|
||||
checkedVersion := false
|
||||
testMux := http.NewServeMux()
|
||||
|
||||
testMux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprintln(w, "This is not a version number")
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
checkedVersion = true
|
||||
})
|
||||
|
||||
testMux.HandleFunc("/join", func(w http.ResponseWriter, r *http.Request) {
|
||||
t.Fatal("should not attempt to join!")
|
||||
})
|
||||
|
||||
ts := httptest.NewServer(testMux)
|
||||
defer ts.Close()
|
||||
|
||||
fakeURL, _ := url.Parse(ts.URL)
|
||||
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
args := []string{"etcd", "-name=node1", "-f", "-data-dir=/tmp/node1", "-peers=" + fakeURL.Host}
|
||||
|
||||
process, err := os.StartProcess(EtcdBinPath, args, procAttr)
|
||||
if err != nil {
|
||||
t.Fatal("start process failed:" + err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
process.Kill()
|
||||
|
||||
_, err = http.Get("http://127.0.0.1:4001")
|
||||
if err == nil {
|
||||
t.Fatal("etcd node should not be up")
|
||||
return
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if checkedVersion == false {
|
||||
t.Fatal("etcd did not check the version")
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -1,103 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/server"
|
||||
"github.com/coreos/etcd/tests"
|
||||
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
|
||||
"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times.
|
||||
// It will print out the election time and the average election time.
|
||||
// It runs in a cluster with standby nodes.
|
||||
func TestKillLeaderWithStandbys(t *testing.T) {
|
||||
// https://github.com/goraft/raft/issues/222
|
||||
t.Skip("stuck on raft issue")
|
||||
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
clusterSize := 5
|
||||
argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
|
||||
if err != nil {
|
||||
t.Fatal("cannot create cluster")
|
||||
}
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
stop := make(chan bool)
|
||||
leaderChan := make(chan string, 1)
|
||||
all := make(chan bool, 1)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
go Monitor(clusterSize, 1, leaderChan, all, stop)
|
||||
|
||||
c := etcd.NewClient(nil)
|
||||
c.SyncCluster()
|
||||
|
||||
// Reconfigure with a small active size.
|
||||
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":2, "syncInterval":1}`))
|
||||
if !assert.Equal(t, resp.StatusCode, 200) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// Wait for two monitor cycles before checking for demotion.
|
||||
time.Sleep((2 * server.ActiveMonitorTimeout) + (2 * time.Second))
|
||||
|
||||
// Verify that we have 3 peers.
|
||||
result, err := c.Get("_etcd/machines", true, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 3)
|
||||
|
||||
var totalTime time.Duration
|
||||
|
||||
leader := "http://127.0.0.1:7001"
|
||||
|
||||
for i := 0; i < clusterSize; i++ {
|
||||
t.Log("leader is ", leader)
|
||||
port, _ := strconv.Atoi(strings.Split(leader, ":")[2])
|
||||
num := port - 7001
|
||||
t.Log("kill server ", num)
|
||||
etcds[num].Kill()
|
||||
etcds[num].Release()
|
||||
|
||||
start := time.Now()
|
||||
for {
|
||||
newLeader := <-leaderChan
|
||||
if newLeader != leader {
|
||||
leader = newLeader
|
||||
break
|
||||
}
|
||||
}
|
||||
take := time.Now().Sub(start)
|
||||
|
||||
totalTime += take
|
||||
avgTime := totalTime / (time.Duration)(i+1)
|
||||
fmt.Println("Total time:", totalTime, "; Avg time:", avgTime)
|
||||
|
||||
time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Verify that we have 3 peers.
|
||||
result, err = c.Get("_etcd/machines", true, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 3)
|
||||
|
||||
// Verify that killed node is not one of those peers.
|
||||
_, err = c.Get(fmt.Sprintf("_etcd/machines/node%d", num+1), false, false)
|
||||
assert.Error(t, err)
|
||||
|
||||
etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr)
|
||||
}
|
||||
stop <- true
|
||||
}
|
||||
@@ -1,248 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/server"
|
||||
"github.com/coreos/etcd/tests"
|
||||
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
|
||||
"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// TestTLSMultiNodeKillAllAndRecovery create a five nodes
|
||||
// then kill all the nodes and restart
|
||||
func TestTLSMultiNodeKillAllAndRecovery(t *testing.T) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
stop := make(chan bool)
|
||||
leaderChan := make(chan string, 1)
|
||||
all := make(chan bool, 1)
|
||||
|
||||
clusterSize := 5
|
||||
argGroup, etcds, err := CreateCluster(clusterSize, procAttr, true)
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("cannot create cluster")
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
c := etcd.NewClient(nil)
|
||||
|
||||
go Monitor(clusterSize, clusterSize, leaderChan, all, stop)
|
||||
<-all
|
||||
<-leaderChan
|
||||
stop <- true
|
||||
|
||||
c.SyncCluster()
|
||||
|
||||
// send 10 commands
|
||||
for i := 0; i < 10; i++ {
|
||||
// Test Set
|
||||
_, err := c.Set("foo", "bar", 0)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// kill all
|
||||
DestroyCluster(etcds)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
stop = make(chan bool)
|
||||
leaderChan = make(chan string, 1)
|
||||
all = make(chan bool, 1)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
for i := 0; i < clusterSize; i++ {
|
||||
etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr)
|
||||
// See util.go for the reason to wait for server
|
||||
client := buildClient()
|
||||
err = WaitForServer("127.0.0.1:400"+strconv.Itoa(i+1), client, "http")
|
||||
if err != nil {
|
||||
t.Fatalf("node start error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
go Monitor(clusterSize, 1, leaderChan, all, stop)
|
||||
|
||||
<-all
|
||||
<-leaderChan
|
||||
|
||||
result, err := c.Set("foo", "bar", 0)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Recovery error: %s", err)
|
||||
}
|
||||
|
||||
if result.Node.ModifiedIndex != 17 {
|
||||
t.Fatalf("recovery failed! [%d/17]", result.Node.ModifiedIndex)
|
||||
}
|
||||
}
|
||||
|
||||
// Create a five-node cluster
|
||||
// Kill all the nodes and restart
|
||||
func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
stop := make(chan bool)
|
||||
leaderChan := make(chan string, 1)
|
||||
all := make(chan bool, 1)
|
||||
|
||||
clusterSize := 15
|
||||
argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("cannot create cluster")
|
||||
}
|
||||
|
||||
c := etcd.NewClient(nil)
|
||||
|
||||
go Monitor(clusterSize, clusterSize, leaderChan, all, stop)
|
||||
<-all
|
||||
<-leaderChan
|
||||
stop <- true
|
||||
|
||||
c.SyncCluster()
|
||||
|
||||
// Reconfigure with smaller active size (7 nodes) and wait for remove.
|
||||
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":7}`))
|
||||
if !assert.Equal(t, resp.StatusCode, 200) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
time.Sleep(2*server.ActiveMonitorTimeout + (1 * time.Second))
|
||||
|
||||
// Verify that there is three machines in peer mode.
|
||||
result, err := c.Get("_etcd/machines", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 7)
|
||||
|
||||
// send set commands
|
||||
for i := 0; i < 2*clusterSize; i++ {
|
||||
// Test Set
|
||||
_, err := c.Set("foo", "bar", 0)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// kill all
|
||||
DestroyCluster(etcds)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
stop = make(chan bool)
|
||||
leaderChan = make(chan string, 1)
|
||||
all = make(chan bool, 1)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
for i := 0; i < clusterSize; i++ {
|
||||
etcds[i], err = os.StartProcess(EtcdBinPath, append(argGroup[i], "-peers="), procAttr)
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// send set commands
|
||||
for i := 0; i < 2*clusterSize; i++ {
|
||||
// Test Set
|
||||
_, err := c.Set("foo", "bar", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Recovery error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify that we have seven machines.
|
||||
result, err = c.Get("_etcd/machines", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 7)
|
||||
}
|
||||
|
||||
// Create a five nodes
|
||||
// Kill all the nodes and restart, then remove the leader
|
||||
func TestMultiNodeKillAllAndRecoveryAndRemoveLeader(t *testing.T) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
stop := make(chan bool)
|
||||
leaderChan := make(chan string, 1)
|
||||
all := make(chan bool, 1)
|
||||
|
||||
clusterSize := 5
|
||||
argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("cannot create cluster")
|
||||
}
|
||||
|
||||
c := etcd.NewClient(nil)
|
||||
|
||||
go Monitor(clusterSize, clusterSize, leaderChan, all, stop)
|
||||
<-all
|
||||
<-leaderChan
|
||||
stop <- true
|
||||
|
||||
// It needs some time to sync current commits and write it to disk.
|
||||
// Or some instance may be restarted as a new peer, and we don't support
|
||||
// to connect back the old cluster that doesn't have majority alive
|
||||
// without log now.
|
||||
time.Sleep(time.Second)
|
||||
|
||||
c.SyncCluster()
|
||||
|
||||
// kill all
|
||||
DestroyCluster(etcds)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
stop = make(chan bool)
|
||||
leaderChan = make(chan string, 1)
|
||||
all = make(chan bool, 1)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
for i := 0; i < clusterSize; i++ {
|
||||
etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr)
|
||||
}
|
||||
|
||||
go Monitor(clusterSize, 1, leaderChan, all, stop)
|
||||
|
||||
<-all
|
||||
leader := <-leaderChan
|
||||
|
||||
_, err = c.Set("foo", "bar", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Recovery error: %s", err)
|
||||
}
|
||||
|
||||
port, _ := strconv.Atoi(strings.Split(leader, ":")[2])
|
||||
num := port - 7000
|
||||
resp, _ := tests.Delete(leader+"/v2/admin/machines/node"+strconv.Itoa(num), "application/json", nil)
|
||||
if !assert.Equal(t, resp.StatusCode, 200) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// check the old leader is in standby mode now
|
||||
time.Sleep(time.Second)
|
||||
resp, _ = tests.Get(leader + "/name")
|
||||
assert.Equal(t, resp.StatusCode, 404)
|
||||
}
|
||||
@@ -1,195 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
func increasePeerAddressPort(args []string, delta int) []string {
|
||||
for i, arg := range args {
|
||||
if !strings.Contains(arg, "peer-addr") {
|
||||
continue
|
||||
}
|
||||
splitArg := strings.Split(arg, ":")
|
||||
port, _ := strconv.Atoi(splitArg[len(splitArg)-1])
|
||||
args[i] = "-peer-addr=127.0.0.1:" + strconv.Itoa(port+delta)
|
||||
return args
|
||||
}
|
||||
return append(args, "-peer-addr=127.0.0.1:"+strconv.Itoa(7001+delta))
|
||||
}
|
||||
|
||||
func increaseAddressPort(args []string, delta int) []string {
|
||||
for i, arg := range args {
|
||||
if !strings.HasPrefix(arg, "-addr") && !strings.HasPrefix(arg, "--addr") {
|
||||
continue
|
||||
}
|
||||
splitArg := strings.Split(arg, ":")
|
||||
port, _ := strconv.Atoi(splitArg[len(splitArg)-1])
|
||||
args[i] = "-addr=127.0.0.1:" + strconv.Itoa(port+delta)
|
||||
return args
|
||||
}
|
||||
return append(args, "-addr=127.0.0.1:"+strconv.Itoa(4001+delta))
|
||||
}
|
||||
|
||||
func increaseDataDir(args []string, delta int) []string {
|
||||
for i, arg := range args {
|
||||
if !strings.Contains(arg, "-data-dir") {
|
||||
continue
|
||||
}
|
||||
splitArg := strings.Split(arg, "node")
|
||||
idx, _ := strconv.Atoi(splitArg[len(splitArg)-1])
|
||||
args[i] = "-data-dir=/tmp/node" + strconv.Itoa(idx+delta)
|
||||
return args
|
||||
}
|
||||
return args
|
||||
}
|
||||
|
||||
// Create a five-node cluster
|
||||
// Random kill one of the nodes and restart it with different peer address
|
||||
func TestRejoinWithDifferentPeerAddress(t *testing.T) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
clusterSize := 5
|
||||
argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("cannot create cluster")
|
||||
}
|
||||
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
num := rand.Int() % clusterSize
|
||||
fmt.Println("kill node", num+1)
|
||||
|
||||
etcds[num].Kill()
|
||||
etcds[num].Release()
|
||||
time.Sleep(time.Second)
|
||||
|
||||
argGroup[num] = increasePeerAddressPort(argGroup[num], clusterSize)
|
||||
// restart
|
||||
etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
c := etcd.NewClient(nil)
|
||||
c.SyncCluster()
|
||||
result, err := c.Set("foo", "bar", 0)
|
||||
if err != nil || result.Node.Key != "/foo" || result.Node.Value != "bar" {
|
||||
t.Fatal("Failed to set value in etcd cluster")
|
||||
}
|
||||
}
|
||||
|
||||
// Create a five-node cluster
|
||||
// Replace one of the nodes with different peer address
|
||||
func TestReplaceWithDifferentPeerAddress(t *testing.T) {
|
||||
// TODO(yichengq): find some way to avoid the error that will be
|
||||
// caused if some node joins the cluster with the collided name.
|
||||
// Possible solutions:
|
||||
// 1. Remove itself when executing a join command with the same name
|
||||
// and different peer address. However, it should find some way to
|
||||
// trigger that execution because the leader may update its address
|
||||
// and stop heartbeat.
|
||||
// 2. Remove the node with the same name before join each time.
|
||||
// But this way could be rather overkill.
|
||||
t.Skip("Unimplemented functionality")
|
||||
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
clusterSize := 5
|
||||
argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("cannot create cluster")
|
||||
}
|
||||
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
rand.Int()
|
||||
for i := 0; i < 10; i++ {
|
||||
num := rand.Int() % clusterSize
|
||||
fmt.Println("replace node", num+1)
|
||||
|
||||
argGroup[num] = increasePeerAddressPort(argGroup[num], clusterSize)
|
||||
argGroup[num] = increaseAddressPort(argGroup[num], clusterSize)
|
||||
argGroup[num] = increaseDataDir(argGroup[num], clusterSize)
|
||||
// restart
|
||||
newEtcd, err := os.StartProcess(EtcdBinPath, append(argGroup[num], "-f"), procAttr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
etcds[num].Wait()
|
||||
etcds[num] = newEtcd
|
||||
}
|
||||
|
||||
c := etcd.NewClient(nil)
|
||||
c.SyncCluster()
|
||||
result, err := c.Set("foo", "bar", 0)
|
||||
if err != nil || result.Node.Key != "/foo" || result.Node.Value != "bar" {
|
||||
t.Fatal("Failed to set value in etcd cluster")
|
||||
}
|
||||
}
|
||||
|
||||
// Create a five-node cluster
|
||||
// Let the sixth instance join with different name and existing peer address
|
||||
func TestRejoinWithDifferentName(t *testing.T) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
clusterSize := 5
|
||||
argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("cannot create cluster")
|
||||
}
|
||||
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
num := rand.Int() % clusterSize
|
||||
fmt.Println("join node 6 that collides with node", num+1)
|
||||
|
||||
// kill
|
||||
etcds[num].Kill()
|
||||
etcds[num].Release()
|
||||
time.Sleep(time.Second)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
// restart
|
||||
if i == 0 {
|
||||
etcds[num], err = os.StartProcess(EtcdBinPath, append(argGroup[num], "-name=node6", "-peers=127.0.0.1:7002"), procAttr)
|
||||
} else {
|
||||
etcds[num], err = os.StartProcess(EtcdBinPath, append(argGroup[num], "-f", "-name=node6", "-peers=127.0.0.1:7002"), procAttr)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatal("failed to start process:", err)
|
||||
}
|
||||
|
||||
timer := time.AfterFunc(10*time.Second, func() {
|
||||
t.Fatal("new etcd should fail immediately")
|
||||
})
|
||||
etcds[num].Wait()
|
||||
etcds[num] = nil
|
||||
timer.Stop()
|
||||
}
|
||||
}
|
||||
@@ -1,200 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"os"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
|
||||
|
||||
"github.com/coreos/etcd/tests"
|
||||
"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// remove the node and node rejoin with previous log
|
||||
func TestRemoveNode(t *testing.T) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
clusterSize := 4
|
||||
argGroup, etcds, _ := CreateCluster(clusterSize, procAttr, false)
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
c := etcd.NewClient(nil)
|
||||
|
||||
c.SyncCluster()
|
||||
|
||||
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4, "syncInterval":5}`))
|
||||
if !assert.Equal(t, resp.StatusCode, 200) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/remove/node3", nil)
|
||||
|
||||
client := &http.Client{}
|
||||
for i := 0; i < 2; i++ {
|
||||
for i := 0; i < 2; i++ {
|
||||
client.Do(rmReq)
|
||||
|
||||
fmt.Println("send remove to node3 and wait for its exiting")
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
resp, err := c.Get("_etcd/machines", false, false)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(resp.Node.Nodes) != 3 {
|
||||
t.Fatal("cannot remove peer")
|
||||
}
|
||||
|
||||
etcds[2].Kill()
|
||||
etcds[2].Wait()
|
||||
|
||||
if i == 1 {
|
||||
// rejoin with log
|
||||
etcds[2], err = os.StartProcess(EtcdBinPath, argGroup[2], procAttr)
|
||||
} else {
|
||||
// rejoin without log
|
||||
etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2], "-f"), procAttr)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
time.Sleep(time.Second + 5*time.Second)
|
||||
|
||||
resp, err = c.Get("_etcd/machines", false, false)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(resp.Node.Nodes) != 4 {
|
||||
t.Fatalf("add peer fails #1 (%d != 4)", len(resp.Node.Nodes))
|
||||
}
|
||||
}
|
||||
|
||||
// first kill the node, then remove it, then add it back
|
||||
for i := 0; i < 2; i++ {
|
||||
etcds[2].Kill()
|
||||
fmt.Println("kill node3 and wait for its exiting")
|
||||
etcds[2].Wait()
|
||||
|
||||
client.Do(rmReq)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
resp, err := c.Get("_etcd/machines", false, false)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(resp.Node.Nodes) != 3 {
|
||||
t.Fatal("cannot remove peer")
|
||||
}
|
||||
|
||||
if i == 1 {
|
||||
// rejoin with log
|
||||
etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2]), procAttr)
|
||||
} else {
|
||||
// rejoin without log
|
||||
etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2], "-f"), procAttr)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
time.Sleep(time.Second + time.Second)
|
||||
|
||||
resp, err = c.Get("_etcd/machines", false, false)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(resp.Node.Nodes) != 4 {
|
||||
t.Fatalf("add peer fails #2 (%d != 4)", len(resp.Node.Nodes))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemovePausedNode(t *testing.T) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
clusterSize := 4
|
||||
_, etcds, _ := CreateCluster(clusterSize, procAttr, false)
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
c := etcd.NewClient(nil)
|
||||
|
||||
c.SyncCluster()
|
||||
|
||||
r, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":1, "syncInterval":1}`))
|
||||
if !assert.Equal(t, r.StatusCode, 200) {
|
||||
t.FailNow()
|
||||
}
|
||||
// Wait for standby instances to update its cluster config
|
||||
time.Sleep(6 * time.Second)
|
||||
|
||||
resp, err := c.Get("_etcd/machines", false, false)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if len(resp.Node.Nodes) != 3 {
|
||||
t.Fatal("cannot remove peer")
|
||||
}
|
||||
|
||||
for i := 0; i < clusterSize; i++ {
|
||||
// first pause the node, then remove it, then resume it
|
||||
idx := rand.Int() % clusterSize
|
||||
|
||||
etcds[idx].Signal(syscall.SIGSTOP)
|
||||
fmt.Printf("pause node%d and let standby node take its place\n", idx+1)
|
||||
|
||||
time.Sleep(4 * time.Second)
|
||||
|
||||
etcds[idx].Signal(syscall.SIGCONT)
|
||||
// let it change its state to candidate at least
|
||||
time.Sleep(time.Second)
|
||||
|
||||
stop := make(chan bool)
|
||||
leaderChan := make(chan string, 1)
|
||||
all := make(chan bool, 1)
|
||||
|
||||
go Monitor(clusterSize, clusterSize, leaderChan, all, stop)
|
||||
<-all
|
||||
<-leaderChan
|
||||
stop <- true
|
||||
|
||||
resp, err = c.Get("_etcd/machines", false, false)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if len(resp.Node.Nodes) != 3 {
|
||||
t.Fatalf("add peer fails (%d != 3)", len(resp.Node.Nodes))
|
||||
}
|
||||
for i := 0; i < 3; i++ {
|
||||
if resp.Node.Nodes[i].Key == fmt.Sprintf("node%d", idx+1) {
|
||||
t.Fatal("node should be removed")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,67 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
func TestSimpleMultiNode(t *testing.T) {
|
||||
templateTestSimpleMultiNode(t, false)
|
||||
}
|
||||
|
||||
func TestSimpleMultiNodeTls(t *testing.T) {
|
||||
templateTestSimpleMultiNode(t, true)
|
||||
}
|
||||
|
||||
// Create a three nodes and try to set value
|
||||
func templateTestSimpleMultiNode(t *testing.T, tls bool) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
|
||||
clusterSize := 3
|
||||
|
||||
_, etcds, err := CreateCluster(clusterSize, procAttr, tls)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create cluster: %v", err)
|
||||
}
|
||||
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
c := etcd.NewClient(nil)
|
||||
|
||||
if c.SyncCluster() == false {
|
||||
t.Fatal("Cannot sync cluster!")
|
||||
}
|
||||
|
||||
// Test Set
|
||||
result, err := c.Set("foo", "bar", 100)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
node := result.Node
|
||||
if node.Key != "/foo" || node.Value != "bar" || node.TTL < 95 {
|
||||
t.Fatalf("Set 1 failed with %s %s %v", node.Key, node.Value, node.TTL)
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
result, err = c.Set("foo", "bar", 100)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
node = result.Node
|
||||
if node.Key != "/foo" || node.Value != "bar" || node.TTL < 95 {
|
||||
t.Fatalf("Set 2 failed with %s %s %v", node.Key, node.Value, node.TTL)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,151 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
// This test creates a single node and then set a value to it to trigger snapshot
|
||||
func TestSnapshot(t *testing.T) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
args := []string{"etcd", "-name=node1", "-data-dir=/tmp/node1", "-snapshot=true", "-snapshot-count=500"}
|
||||
|
||||
process, err := os.StartProcess(EtcdBinPath, append(args, "-f"), procAttr)
|
||||
if err != nil {
|
||||
t.Fatal("start process failed:" + err.Error())
|
||||
}
|
||||
defer process.Kill()
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
c := etcd.NewClient(nil)
|
||||
|
||||
c.SyncCluster()
|
||||
// issue first 501 commands
|
||||
for i := 0; i < 501; i++ {
|
||||
result, err := c.Set("foo", "bar", 100)
|
||||
node := result.Node
|
||||
|
||||
if err != nil || node.Key != "/foo" || node.Value != "bar" || node.TTL < 95 {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Fatalf("Set failed with %s %s %v", node.Key, node.Value, node.TTL)
|
||||
}
|
||||
}
|
||||
|
||||
// wait for a snapshot interval
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
snapshots, err := ioutil.ReadDir("/tmp/node1/snapshot")
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("list snapshot failed:" + err.Error())
|
||||
}
|
||||
|
||||
if len(snapshots) != 1 {
|
||||
t.Fatal("wrong number of snapshot :[1/", len(snapshots), "]")
|
||||
}
|
||||
|
||||
index, _ := strconv.Atoi(snapshots[0].Name()[2:5])
|
||||
|
||||
if index < 503 || index > 516 {
|
||||
t.Fatal("wrong name of snapshot :", snapshots[0].Name())
|
||||
}
|
||||
|
||||
// issue second 501 commands
|
||||
for i := 0; i < 501; i++ {
|
||||
result, err := c.Set("foo", "bar", 100)
|
||||
node := result.Node
|
||||
|
||||
if err != nil || node.Key != "/foo" || node.Value != "bar" || node.TTL < 95 {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Fatalf("Set failed with %s %s %v", node.Key, node.Value, node.TTL)
|
||||
}
|
||||
}
|
||||
|
||||
// wait for a snapshot interval
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
snapshots, err = ioutil.ReadDir("/tmp/node1/snapshot")
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("list snapshot failed:" + err.Error())
|
||||
}
|
||||
|
||||
if len(snapshots) != 1 {
|
||||
t.Fatal("wrong number of snapshot :[1/", len(snapshots), "]")
|
||||
}
|
||||
|
||||
index, _ = strconv.Atoi(snapshots[0].Name()[2:6])
|
||||
|
||||
if index < 1010 || index > 1029 {
|
||||
t.Fatal("wrong name of snapshot :", snapshots[0].Name())
|
||||
}
|
||||
}
|
||||
|
||||
// TestSnapshotRestart tests etcd restarts with snapshot file
|
||||
func TestSnapshotRestart(t *testing.T) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
args := []string{"etcd", "-name=node1", "-data-dir=/tmp/node1", "-snapshot=true", "-snapshot-count=500"}
|
||||
|
||||
process, err := os.StartProcess(EtcdBinPath, append(args, "-f"), procAttr)
|
||||
if err != nil {
|
||||
t.Fatal("start process failed:" + err.Error())
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
c := etcd.NewClient(nil)
|
||||
|
||||
c.SyncCluster()
|
||||
// issue first 501 commands
|
||||
for i := 0; i < 501; i++ {
|
||||
result, err := c.Set("foo", "bar", 100)
|
||||
node := result.Node
|
||||
|
||||
if err != nil || node.Key != "/foo" || node.Value != "bar" || node.TTL < 95 {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Fatalf("Set failed with %s %s %v", node.Key, node.Value, node.TTL)
|
||||
}
|
||||
}
|
||||
|
||||
// wait for a snapshot interval
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
_, err = ioutil.ReadDir("/tmp/node1/snapshot")
|
||||
if err != nil {
|
||||
t.Fatal("list snapshot failed:" + err.Error())
|
||||
}
|
||||
|
||||
process.Kill()
|
||||
|
||||
process, err = os.StartProcess(EtcdBinPath, args, procAttr)
|
||||
if err != nil {
|
||||
t.Fatal("start process failed:" + err.Error())
|
||||
}
|
||||
defer process.Kill()
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
_, err = c.Set("foo", "bar", 100)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
@@ -1,342 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/server"
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/etcd/tests"
|
||||
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
|
||||
"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// Create a full cluster and then change the active size.
|
||||
func TestStandby(t *testing.T) {
|
||||
clusterSize := 15
|
||||
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
|
||||
if !assert.NoError(t, err) {
|
||||
t.Fatal("cannot create cluster")
|
||||
}
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncInterval":1}`))
|
||||
if !assert.Equal(t, resp.StatusCode, 200) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
c := etcd.NewClient(nil)
|
||||
c.SyncCluster()
|
||||
|
||||
// Verify that we just have default machines.
|
||||
result, err := c.Get("_etcd/machines", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 9)
|
||||
|
||||
t.Log("Reconfigure with a smaller active size")
|
||||
resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":7, "syncInterval":1}`))
|
||||
if !assert.Equal(t, resp.StatusCode, 200) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// Wait for two monitor cycles before checking for demotion.
|
||||
time.Sleep((2 * server.ActiveMonitorTimeout) + (2 * time.Second))
|
||||
|
||||
// Verify that we now have seven peers.
|
||||
result, err = c.Get("_etcd/machines", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 7)
|
||||
|
||||
t.Log("Test the functionality of all servers")
|
||||
// Set key.
|
||||
time.Sleep(time.Second)
|
||||
if _, err := c.Set("foo", "bar", 0); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Check that all peers and standbys have the value.
|
||||
for i := range etcds {
|
||||
resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
|
||||
if assert.NoError(t, err) {
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) {
|
||||
assert.Equal(t, node["value"], "bar")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
t.Log("Reconfigure with larger active size and wait for join")
|
||||
resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "syncInterval":1}`))
|
||||
if !assert.Equal(t, resp.StatusCode, 200) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
time.Sleep((1 * time.Second) + (1 * time.Second))
|
||||
|
||||
// Verify that exactly eight machines are in the cluster.
|
||||
result, err = c.Get("_etcd/machines", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 8)
|
||||
}
|
||||
|
||||
// Create a full cluster, disconnect a peer, wait for removal, wait for standby join.
|
||||
func TestStandbyAutoJoin(t *testing.T) {
|
||||
clusterSize := 5
|
||||
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
|
||||
if err != nil {
|
||||
t.Fatal("cannot create cluster")
|
||||
}
|
||||
defer func() {
|
||||
// Wrap this in a closure so that it picks up the updated version of
|
||||
// the "etcds" variable.
|
||||
DestroyCluster(etcds)
|
||||
}()
|
||||
|
||||
c := etcd.NewClient(nil)
|
||||
c.SyncCluster()
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Verify that we have five machines.
|
||||
result, err := c.Get("_etcd/machines", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 5)
|
||||
|
||||
// Reconfigure with a short remove delay (2 second).
|
||||
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4, "removeDelay":2, "syncInterval":1}`))
|
||||
if !assert.Equal(t, resp.StatusCode, 200) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// Wait for a monitor cycle before checking for removal.
|
||||
time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
|
||||
|
||||
// Verify that we now have four peers.
|
||||
result, err = c.Get("_etcd/machines", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 4)
|
||||
|
||||
// Remove peer.
|
||||
etcd := etcds[1]
|
||||
etcds = append(etcds[:1], etcds[2:]...)
|
||||
if err := etcd.Kill(); err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
etcd.Release()
|
||||
|
||||
// Wait for it to get dropped.
|
||||
time.Sleep(server.PeerActivityMonitorTimeout + (1 * time.Second))
|
||||
|
||||
// Wait for the standby to join.
|
||||
time.Sleep((1 * time.Second) + (1 * time.Second))
|
||||
|
||||
// Verify that we have 4 peers.
|
||||
result, err = c.Get("_etcd/machines", true, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 4)
|
||||
|
||||
// Verify that node2 is not one of those peers.
|
||||
_, err = c.Get("_etcd/machines/node2", false, false)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
// Create a full cluster and then change the active size gradually.
|
||||
func TestStandbyGradualChange(t *testing.T) {
|
||||
clusterSize := 9
|
||||
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
|
||||
assert.NoError(t, err)
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("cannot create cluster")
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
c := etcd.NewClient(nil)
|
||||
c.SyncCluster()
|
||||
|
||||
num := clusterSize
|
||||
for inc := 0; inc < 2; inc++ {
|
||||
for i := 0; i < 6; i++ {
|
||||
// Verify that we just have i machines.
|
||||
result, err := c.Get("_etcd/machines", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), num)
|
||||
|
||||
if inc == 0 {
|
||||
num--
|
||||
} else {
|
||||
num++
|
||||
}
|
||||
|
||||
t.Log("Reconfigure with active size", num)
|
||||
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncInterval":1}`, num)))
|
||||
if !assert.Equal(t, resp.StatusCode, 200) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
if inc == 0 {
|
||||
// Wait for monitor cycles before checking for demotion.
|
||||
time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
|
||||
} else {
|
||||
time.Sleep(time.Second + (1 * time.Second))
|
||||
}
|
||||
|
||||
// Verify that we now have peers.
|
||||
result, err = c.Get("_etcd/machines", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), num)
|
||||
|
||||
t.Log("Test the functionality of all servers")
|
||||
// Set key.
|
||||
if _, err := c.Set("foo", "bar", 0); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Check that all peers and standbys have the value.
|
||||
for i := range etcds {
|
||||
resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
|
||||
if assert.NoError(t, err) {
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) {
|
||||
assert.Equal(t, node["value"], "bar")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create a full cluster and then change the active size dramatically.
|
||||
func TestStandbyDramaticChange(t *testing.T) {
|
||||
clusterSize := 9
|
||||
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
|
||||
assert.NoError(t, err)
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("cannot create cluster")
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
c := etcd.NewClient(nil)
|
||||
c.SyncCluster()
|
||||
|
||||
num := clusterSize
|
||||
for i := 0; i < 3; i++ {
|
||||
for inc := 0; inc < 2; inc++ {
|
||||
// Verify that we just have i machines.
|
||||
result, err := c.Get("_etcd/machines", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), num)
|
||||
|
||||
if inc == 0 {
|
||||
num -= 6
|
||||
} else {
|
||||
num += 6
|
||||
}
|
||||
|
||||
t.Log("Reconfigure with active size", num)
|
||||
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncInterval":1}`, num)))
|
||||
if !assert.Equal(t, resp.StatusCode, 200) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
if inc == 0 {
|
||||
// Wait for monitor cycles before checking for demotion.
|
||||
time.Sleep(6*server.ActiveMonitorTimeout + (1 * time.Second))
|
||||
} else {
|
||||
time.Sleep(time.Second + (1 * time.Second))
|
||||
}
|
||||
|
||||
// Verify that we now have peers.
|
||||
result, err = c.Get("_etcd/machines", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), num)
|
||||
|
||||
t.Log("Test the functionality of all servers")
|
||||
// Set key.
|
||||
if _, err := c.Set("foo", "bar", 0); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Check that all peers and standbys have the value.
|
||||
for i := range etcds {
|
||||
resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
|
||||
if assert.NoError(t, err) {
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) {
|
||||
assert.Equal(t, node["value"], "bar")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStandbyJoinMiss(t *testing.T) {
|
||||
clusterSize := 2
|
||||
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
|
||||
if err != nil {
|
||||
t.Fatal("cannot create cluster")
|
||||
}
|
||||
defer DestroyCluster(etcds)
|
||||
|
||||
c := etcd.NewClient(nil)
|
||||
c.SyncCluster()
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Verify that we have two machines.
|
||||
result, err := c.Get("_etcd/machines", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), clusterSize)
|
||||
|
||||
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"removeDelay":4, "syncInterval":4}`))
|
||||
if !assert.Equal(t, resp.StatusCode, 200) {
|
||||
t.FailNow()
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
|
||||
resp, _ = tests.Delete("http://localhost:7001/v2/admin/machines/node2", "application/json", nil)
|
||||
if !assert.Equal(t, resp.StatusCode, 200) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// Wait for a monitor cycle before checking for removal.
|
||||
time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
|
||||
|
||||
// Verify that we now have one peer.
|
||||
result, err = c.Get("_etcd/machines", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 1)
|
||||
|
||||
// Simulate the join failure
|
||||
_, err = server.NewClient(nil).AddMachine("http://localhost:7001",
|
||||
&server.JoinCommand{
|
||||
MinVersion: store.MinVersion(),
|
||||
MaxVersion: store.MaxVersion(),
|
||||
Name: "node2",
|
||||
RaftURL: "http://127.0.0.1:7002",
|
||||
EtcdURL: "http://127.0.0.1:4002",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
time.Sleep(6 * time.Second)
|
||||
|
||||
go tests.Delete("http://localhost:7001/v2/admin/machines/node2", "application/json", nil)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
result, err = c.Get("_etcd/machines", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 1)
|
||||
}
|
||||
@@ -1,258 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
/*
|
||||
Copyright 2013 CoreOS Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
var client = http.Client{
|
||||
Transport: &http.Transport{
|
||||
Dial: dialTimeoutFast,
|
||||
},
|
||||
}
|
||||
|
||||
// Sending set commands
|
||||
func Set(stop chan bool) {
|
||||
|
||||
stopSet := false
|
||||
i := 0
|
||||
c := etcd.NewClient(nil)
|
||||
for {
|
||||
key := fmt.Sprintf("%s_%v", "foo", i)
|
||||
|
||||
result, err := c.Set(key, "bar", 0)
|
||||
|
||||
if err != nil || result.Node.Key != "/"+key || result.Node.Value != "bar" {
|
||||
select {
|
||||
case <-stop:
|
||||
stopSet = true
|
||||
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-stop:
|
||||
stopSet = true
|
||||
|
||||
default:
|
||||
}
|
||||
|
||||
if stopSet {
|
||||
break
|
||||
}
|
||||
|
||||
i++
|
||||
}
|
||||
stop <- true
|
||||
}
|
||||
|
||||
func WaitForServer(host string, client http.Client, scheme string) error {
|
||||
path := fmt.Sprintf("%s://%s/v2/keys/", scheme, host)
|
||||
|
||||
var resp *http.Response
|
||||
var err error
|
||||
for i := 0; i < 10; i++ {
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
resp, err = client.Get(path)
|
||||
if err == nil && resp.StatusCode == 200 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return errors.New(fmt.Sprintf("etcd server was not reachable in a long time, last-time response and error: %v; %v", resp, err))
|
||||
}
|
||||
|
||||
// Create a cluster of etcd nodes
|
||||
func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os.Process, error) {
|
||||
argGroup := make([][]string, size)
|
||||
|
||||
sslServer1 := []string{"-peer-ca-file=../../fixtures/ca/ca.crt",
|
||||
"-peer-cert-file=../../fixtures/ca/server.crt",
|
||||
"-peer-key-file=../../fixtures/ca/server.key.insecure",
|
||||
}
|
||||
|
||||
sslServer2 := []string{"-peer-ca-file=../../fixtures/ca/ca.crt",
|
||||
"-peer-cert-file=../../fixtures/ca/server2.crt",
|
||||
"-peer-key-file=../../fixtures/ca/server2.key.insecure",
|
||||
}
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
if i == 0 {
|
||||
argGroup[i] = []string{"etcd", "-data-dir=/tmp/node1", "-name=node1", "-cluster-remove-delay=1800"}
|
||||
if ssl {
|
||||
argGroup[i] = append(argGroup[i], sslServer1...)
|
||||
}
|
||||
} else {
|
||||
strI := strconv.Itoa(i + 1)
|
||||
argGroup[i] = []string{"etcd", "-name=node" + strI, fmt.Sprintf("-addr=127.0.0.1:%d", 4001+i), fmt.Sprintf("-peer-addr=127.0.0.1:%d", 7001+i), "-data-dir=/tmp/node" + strI, "-peers=127.0.0.1:7001", "-cluster-remove-delay=1800"}
|
||||
if ssl {
|
||||
argGroup[i] = append(argGroup[i], sslServer2...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
etcds := make([]*os.Process, size)
|
||||
|
||||
for i := range etcds {
|
||||
var err error
|
||||
etcds[i], err = os.StartProcess(EtcdBinPath, append(argGroup[i], "-f"), procAttr)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// The problem is that if the master isn't up then the children
|
||||
// have to retry. This retry can take upwards of 15 seconds
|
||||
// which slows tests way down and some of them fail.
|
||||
//
|
||||
// Waiting for each server to start when ssl is a workaround.
|
||||
// Autotest machines are dramatically slow, and it could spend
|
||||
// several seconds to build TSL connections between servers. That
|
||||
// is extremely terribe when the second machine joins the cluster
|
||||
// because the cluster is out of work at this time. The guy
|
||||
// tries to join during this time will fail, and current implementation
|
||||
// makes it fail after just one-time try(bug in #661). This
|
||||
// makes the cluster start with N-1 machines.
|
||||
// TODO(yichengq): It should be fixed.
|
||||
if i == 0 || ssl {
|
||||
client := buildClient()
|
||||
err = WaitForServer("127.0.0.1:400"+strconv.Itoa(i+1), client, "http")
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return argGroup, etcds, nil
|
||||
}
|
||||
|
||||
// Destroy all the nodes in the cluster
|
||||
func DestroyCluster(etcds []*os.Process) error {
|
||||
for _, etcd := range etcds {
|
||||
if etcd == nil {
|
||||
continue
|
||||
}
|
||||
err := etcd.Kill()
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
etcd.Release()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//
|
||||
func Monitor(size int, allowDeadNum int, leaderChan chan string, all chan bool, stop chan bool) {
|
||||
leaderMap := make(map[int]string)
|
||||
baseAddrFormat := "http://0.0.0.0:%d"
|
||||
|
||||
for {
|
||||
knownLeader := "unknown"
|
||||
dead := 0
|
||||
var i int
|
||||
|
||||
for i = 0; i < size; i++ {
|
||||
leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+4001))
|
||||
|
||||
if err == nil {
|
||||
leaderMap[i] = leader
|
||||
|
||||
if knownLeader == "unknown" {
|
||||
knownLeader = leader
|
||||
} else {
|
||||
if leader != knownLeader {
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} else {
|
||||
dead++
|
||||
if dead > allowDeadNum {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if i == size {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-leaderChan:
|
||||
leaderChan <- knownLeader
|
||||
default:
|
||||
leaderChan <- knownLeader
|
||||
}
|
||||
|
||||
}
|
||||
if dead == 0 {
|
||||
select {
|
||||
case <-all:
|
||||
all <- true
|
||||
default:
|
||||
all <- true
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func getLeader(addr string) (string, error) {
|
||||
|
||||
resp, err := client.Get(addr + "/v2/leader")
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
resp.Body.Close()
|
||||
return "", fmt.Errorf("no leader")
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
|
||||
resp.Body.Close()
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(b), nil
|
||||
|
||||
}
|
||||
|
||||
// Dial with timeout
|
||||
func dialTimeoutFast(network, addr string) (net.Conn, error) {
|
||||
return net.DialTimeout(network, addr, time.Millisecond*10)
|
||||
}
|
||||
Reference in New Issue
Block a user