From 058537f34a083ef1ec8c5b280621e2a4c00ebec9 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 16 Oct 2014 13:35:59 -0700 Subject: [PATCH] main: add basic functional test --- etcdserver/server.go | 12 +-- etcdserver/server_test.go | 8 +- functional/cluster_test.go | 181 +++++++++++++++++++++++++++++++++++++ test | 2 +- 4 files changed, 192 insertions(+), 11 deletions(-) create mode 100644 functional/cluster_test.go diff --git a/etcdserver/server.go b/etcdserver/server.go index 68b21c3cc..f0ec75173 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -146,8 +146,8 @@ type EtcdServer struct { storage Storage - ticker <-chan time.Time - syncTicker <-chan time.Time + Ticker <-chan time.Time + SyncTicker <-chan time.Time snapCount uint64 // number of entries to trigger a snapshot @@ -221,8 +221,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer { stats: sstats, lstats: lstats, send: Sender(cfg.Transport, cls, sstats, lstats), - ticker: time.Tick(100 * time.Millisecond), - syncTicker: time.Tick(500 * time.Millisecond), + Ticker: time.Tick(100 * time.Millisecond), + SyncTicker: time.Tick(500 * time.Millisecond), snapCount: cfg.SnapCount, ClusterStore: cls, } @@ -264,14 +264,14 @@ func (s *EtcdServer) run() { var nodes, removedNodes []uint64 for { select { - case <-s.ticker: + case <-s.Ticker: s.node.Tick() case rd := <-s.node.Ready(): if rd.SoftState != nil { nodes = rd.SoftState.Nodes removedNodes = rd.SoftState.RemovedNodes if rd.RaftState == raft.StateLeader { - syncC = s.syncTicker + syncC = s.SyncTicker } else { syncC = nil } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index e33b4bb30..ef2a878cc 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -473,7 +473,7 @@ func testServer(t *testing.T, ns uint64) { store: store.New(), send: send, storage: &storageRecorder{}, - ticker: tk.C, + Ticker: tk.C, ClusterStore: &clusterStoreRecorder{}, } srv.start() @@ -540,7 +540,7 @@ func TestDoProposal(t *testing.T) { store: st, send: func(_ []raftpb.Message) {}, storage: &storageRecorder{}, - ticker: tk, + Ticker: tk, ClusterStore: &clusterStoreRecorder{}, } srv.start() @@ -611,7 +611,7 @@ func TestDoProposalStopped(t *testing.T) { store: st, send: func(_ []raftpb.Message) {}, storage: &storageRecorder{}, - ticker: tk, + Ticker: tk, } srv.start() @@ -711,7 +711,7 @@ func TestSyncTrigger(t *testing.T) { store: &storeRecorder{}, send: func(_ []raftpb.Message) {}, storage: &storageRecorder{}, - syncTicker: st, + SyncTicker: st, } srv.start() // trigger the server to become a leader and accept sync requests diff --git a/functional/cluster_test.go b/functional/cluster_test.go new file mode 100644 index 000000000..023334bb3 --- /dev/null +++ b/functional/cluster_test.go @@ -0,0 +1,181 @@ +package functional + +import ( + "fmt" + "io/ioutil" + "net" + "net/http" + "net/http/httptest" + "net/url" + "os" + "strings" + "testing" + "time" + + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/etcdhttp" + "github.com/coreos/etcd/pkg/transport" + "github.com/coreos/etcd/pkg/types" +) + +const tickDuration = 5 * time.Millisecond + +func TestClusterOf1(t *testing.T) { testCluster(t, 1) } +func TestClusterOf3(t *testing.T) { testCluster(t, 3) } + +func testCluster(t *testing.T, size int) { + c := &cluster{Size: size} + c.Launch(t) + for i := 0; i < size; i++ { + for _, u := range c.Members[i].ClientURLs { + if err := setKey(u, "/foo", "bar"); err != nil { + t.Errorf("setKey on %v error: %v", u.String(), err) + } + } + } + c.Terminate(t) +} + +func setKey(u url.URL, key string, value string) error { + u.Path = "/v2/keys" + key + v := url.Values{"value": []string{value}} + req, err := http.NewRequest("PUT", u.String(), strings.NewReader(v.Encode())) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + ioutil.ReadAll(resp.Body) + resp.Body.Close() + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + return fmt.Errorf("statusCode = %d, want %d or %d", resp.StatusCode, http.StatusOK, http.StatusCreated) + } + return nil +} + +type cluster struct { + Size int + + Members []member +} + +func (c *cluster) Launch(t *testing.T) { + if c.Size <= 0 { + t.Fatalf("cluster size <= 0") + } + + peerListeners := make([]net.Listener, c.Size) + peerAddrs := make([]string, c.Size) + for i := 0; i < c.Size; i++ { + l := newLocalListener(t) + // each member claims only one peer listener + peerListeners[i] = l + peerAddrs[i] = fmt.Sprintf("%s=%s", c.name(i), "http://"+l.Addr().String()) + } + clusterConfig := &etcdserver.Cluster{} + if err := clusterConfig.Set(strings.Join(peerAddrs, ",")); err != nil { + t.Fatal(err) + } + + var err error + for i := 0; i < c.Size; i++ { + m := member{} + m.PeerListeners = []net.Listener{peerListeners[i]} + cln := newLocalListener(t) + m.ClientListeners = []net.Listener{cln} + + m.Name = c.name(i) + m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) + if err != nil { + t.Fatal(err) + } + m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd") + if err != nil { + t.Fatal(err) + } + m.Cluster = clusterConfig + m.ClusterState = etcdserver.ClusterStateValueNew + m.Transport, err = transport.NewTransport(transport.TLSInfo{}) + if err != nil { + t.Fatal(err) + } + + m.Launch(t) + c.Members = append(c.Members, m) + } +} + +func (c *cluster) Terminate(t *testing.T) { + for _, m := range c.Members { + m.Terminate(t) + } +} + +func (c *cluster) name(i int) string { + return fmt.Sprint("node", i) +} + +func newLocalListener(t *testing.T) net.Listener { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + return l +} + +type member struct { + etcdserver.ServerConfig + PeerListeners, ClientListeners []net.Listener + + s *etcdserver.EtcdServer + hss []*httptest.Server +} + +func (m *member) Launch(t *testing.T) { + m.s = etcdserver.NewServer(&m.ServerConfig) + m.s.Ticker = time.Tick(tickDuration) + m.s.SyncTicker = nil + m.s.Start() + + for _, ln := range m.PeerListeners { + hs := &httptest.Server{ + Listener: ln, + Config: &http.Server{Handler: etcdhttp.NewPeerHandler(m.s)}, + } + hs.Start() + m.hss = append(m.hss, hs) + } + for _, ln := range m.ClientListeners { + hs := &httptest.Server{ + Listener: ln, + Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s)}, + } + hs.Start() + m.hss = append(m.hss, hs) + } +} + +func (m *member) Stop(t *testing.T) { + panic("unimplemented") +} + +func (m *member) Start(t *testing.T) { + panic("unimplemented") +} + +func (m *member) Reboot(t *testing.T) { + panic("unimplemented") +} + +func (m *member) Terminate(t *testing.T) { + m.s.Stop() + for _, hs := range m.hss { + hs.Close() + } + if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil { + t.Fatal(err) + } +} diff --git a/test b/test index 52bc71326..4e275f6dc 100755 --- a/test +++ b/test @@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"} source ./build # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt. -TESTABLE_AND_FORMATTABLE="client etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb pkg pkg/flags pkg/transport proxy raft snap store wait wal" +TESTABLE_AND_FORMATTABLE="client etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb functional pkg pkg/flags pkg/transport proxy raft snap store wait wal" TESTABLE="$TESTABLE_AND_FORMATTABLE ./" FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go"