From 058537f34a083ef1ec8c5b280621e2a4c00ebec9 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 16 Oct 2014 13:35:59 -0700 Subject: [PATCH 1/8] main: add basic functional test --- etcdserver/server.go | 12 +-- etcdserver/server_test.go | 8 +- functional/cluster_test.go | 181 +++++++++++++++++++++++++++++++++++++ test | 2 +- 4 files changed, 192 insertions(+), 11 deletions(-) create mode 100644 functional/cluster_test.go diff --git a/etcdserver/server.go b/etcdserver/server.go index 68b21c3cc..f0ec75173 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -146,8 +146,8 @@ type EtcdServer struct { storage Storage - ticker <-chan time.Time - syncTicker <-chan time.Time + Ticker <-chan time.Time + SyncTicker <-chan time.Time snapCount uint64 // number of entries to trigger a snapshot @@ -221,8 +221,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer { stats: sstats, lstats: lstats, send: Sender(cfg.Transport, cls, sstats, lstats), - ticker: time.Tick(100 * time.Millisecond), - syncTicker: time.Tick(500 * time.Millisecond), + Ticker: time.Tick(100 * time.Millisecond), + SyncTicker: time.Tick(500 * time.Millisecond), snapCount: cfg.SnapCount, ClusterStore: cls, } @@ -264,14 +264,14 @@ func (s *EtcdServer) run() { var nodes, removedNodes []uint64 for { select { - case <-s.ticker: + case <-s.Ticker: s.node.Tick() case rd := <-s.node.Ready(): if rd.SoftState != nil { nodes = rd.SoftState.Nodes removedNodes = rd.SoftState.RemovedNodes if rd.RaftState == raft.StateLeader { - syncC = s.syncTicker + syncC = s.SyncTicker } else { syncC = nil } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index e33b4bb30..ef2a878cc 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -473,7 +473,7 @@ func testServer(t *testing.T, ns uint64) { store: store.New(), send: send, storage: &storageRecorder{}, - ticker: tk.C, + Ticker: tk.C, ClusterStore: &clusterStoreRecorder{}, } srv.start() @@ -540,7 +540,7 @@ func TestDoProposal(t *testing.T) { store: st, send: func(_ []raftpb.Message) {}, storage: &storageRecorder{}, - ticker: tk, + Ticker: tk, ClusterStore: &clusterStoreRecorder{}, } srv.start() @@ -611,7 +611,7 @@ func TestDoProposalStopped(t *testing.T) { store: st, send: func(_ []raftpb.Message) {}, storage: &storageRecorder{}, - ticker: tk, + Ticker: tk, } srv.start() @@ -711,7 +711,7 @@ func TestSyncTrigger(t *testing.T) { store: &storeRecorder{}, send: func(_ []raftpb.Message) {}, storage: &storageRecorder{}, - syncTicker: st, + SyncTicker: st, } srv.start() // trigger the server to become a leader and accept sync requests diff --git a/functional/cluster_test.go b/functional/cluster_test.go new file mode 100644 index 000000000..023334bb3 --- /dev/null +++ b/functional/cluster_test.go @@ -0,0 +1,181 @@ +package functional + +import ( + "fmt" + "io/ioutil" + "net" + "net/http" + "net/http/httptest" + "net/url" + "os" + "strings" + "testing" + "time" + + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/etcdhttp" + "github.com/coreos/etcd/pkg/transport" + "github.com/coreos/etcd/pkg/types" +) + +const tickDuration = 5 * time.Millisecond + +func TestClusterOf1(t *testing.T) { testCluster(t, 1) } +func TestClusterOf3(t *testing.T) { testCluster(t, 3) } + +func testCluster(t *testing.T, size int) { + c := &cluster{Size: size} + c.Launch(t) + for i := 0; i < size; i++ { + for _, u := range c.Members[i].ClientURLs { + if err := setKey(u, "/foo", "bar"); err != nil { + t.Errorf("setKey on %v error: %v", u.String(), err) + } + } + } + c.Terminate(t) +} + +func setKey(u url.URL, key string, value string) error { + u.Path = "/v2/keys" + key + v := url.Values{"value": []string{value}} + req, err := http.NewRequest("PUT", u.String(), strings.NewReader(v.Encode())) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + ioutil.ReadAll(resp.Body) + resp.Body.Close() + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + return fmt.Errorf("statusCode = %d, want %d or %d", resp.StatusCode, http.StatusOK, http.StatusCreated) + } + return nil +} + +type cluster struct { + Size int + + Members []member +} + +func (c *cluster) Launch(t *testing.T) { + if c.Size <= 0 { + t.Fatalf("cluster size <= 0") + } + + peerListeners := make([]net.Listener, c.Size) + peerAddrs := make([]string, c.Size) + for i := 0; i < c.Size; i++ { + l := newLocalListener(t) + // each member claims only one peer listener + peerListeners[i] = l + peerAddrs[i] = fmt.Sprintf("%s=%s", c.name(i), "http://"+l.Addr().String()) + } + clusterConfig := &etcdserver.Cluster{} + if err := clusterConfig.Set(strings.Join(peerAddrs, ",")); err != nil { + t.Fatal(err) + } + + var err error + for i := 0; i < c.Size; i++ { + m := member{} + m.PeerListeners = []net.Listener{peerListeners[i]} + cln := newLocalListener(t) + m.ClientListeners = []net.Listener{cln} + + m.Name = c.name(i) + m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) + if err != nil { + t.Fatal(err) + } + m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd") + if err != nil { + t.Fatal(err) + } + m.Cluster = clusterConfig + m.ClusterState = etcdserver.ClusterStateValueNew + m.Transport, err = transport.NewTransport(transport.TLSInfo{}) + if err != nil { + t.Fatal(err) + } + + m.Launch(t) + c.Members = append(c.Members, m) + } +} + +func (c *cluster) Terminate(t *testing.T) { + for _, m := range c.Members { + m.Terminate(t) + } +} + +func (c *cluster) name(i int) string { + return fmt.Sprint("node", i) +} + +func newLocalListener(t *testing.T) net.Listener { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + return l +} + +type member struct { + etcdserver.ServerConfig + PeerListeners, ClientListeners []net.Listener + + s *etcdserver.EtcdServer + hss []*httptest.Server +} + +func (m *member) Launch(t *testing.T) { + m.s = etcdserver.NewServer(&m.ServerConfig) + m.s.Ticker = time.Tick(tickDuration) + m.s.SyncTicker = nil + m.s.Start() + + for _, ln := range m.PeerListeners { + hs := &httptest.Server{ + Listener: ln, + Config: &http.Server{Handler: etcdhttp.NewPeerHandler(m.s)}, + } + hs.Start() + m.hss = append(m.hss, hs) + } + for _, ln := range m.ClientListeners { + hs := &httptest.Server{ + Listener: ln, + Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s)}, + } + hs.Start() + m.hss = append(m.hss, hs) + } +} + +func (m *member) Stop(t *testing.T) { + panic("unimplemented") +} + +func (m *member) Start(t *testing.T) { + panic("unimplemented") +} + +func (m *member) Reboot(t *testing.T) { + panic("unimplemented") +} + +func (m *member) Terminate(t *testing.T) { + m.s.Stop() + for _, hs := range m.hss { + hs.Close() + } + if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil { + t.Fatal(err) + } +} diff --git a/test b/test index 52bc71326..4e275f6dc 100755 --- a/test +++ b/test @@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"} source ./build # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt. -TESTABLE_AND_FORMATTABLE="client etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb pkg pkg/flags pkg/transport proxy raft snap store wait wal" +TESTABLE_AND_FORMATTABLE="client etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb functional pkg pkg/flags pkg/transport proxy raft snap store wait wal" TESTABLE="$TESTABLE_AND_FORMATTABLE ./" FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go" From f7a0d5387b665feb0e113fa5814406c938665ae6 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 16 Oct 2014 21:37:10 -0700 Subject: [PATCH 2/8] etcdserver: stop server gracefully --- etcdserver/server.go | 8 ++++++-- etcdserver/server_test.go | 8 +++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index f0ec75173..c3cd7fab9 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -127,6 +127,7 @@ type RaftTimer interface { type EtcdServer struct { w wait.Wait done chan struct{} + stopped chan struct{} id uint64 attributes Attributes @@ -247,6 +248,7 @@ func (s *EtcdServer) start() { } s.w = wait.New() s.done = make(chan struct{}) + s.stopped = make(chan struct{}) s.stats.Initialize() // TODO: if this is an empty log, writes all peer infos // into the first entry @@ -312,16 +314,18 @@ func (s *EtcdServer) run() { case <-syncC: s.sync(defaultSyncTimeout) case <-s.done: + close(s.stopped) return } } } -// Stop stops the server, and shuts down the running goroutine. Stop should be -// called after a Start(s), otherwise it will block forever. +// Stop stops the server gracefully, and shuts down the running goroutine. +// Stop should be called after a Start(s), otherwise it will block forever. func (s *EtcdServer) Stop() { s.node.Stop() close(s.done) + <-s.stopped } // Do interprets r and performs an operation on s.store according to r.Method diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index ef2a878cc..5b33ff2c7 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -997,10 +997,12 @@ func TestPublish(t *testing.T) { // TestPublishStopped tests that publish will be stopped if server is stopped. func TestPublishStopped(t *testing.T) { srv := &EtcdServer{ - node: &nodeRecorder{}, - w: &waitRecorder{}, - done: make(chan struct{}), + node: &nodeRecorder{}, + w: &waitRecorder{}, + done: make(chan struct{}), + stopped: make(chan struct{}), } + close(srv.stopped) srv.Stop() srv.publish(time.Hour) } From 40279d324aade9d43fa48f9f1f40b69a66db754a Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 16 Oct 2014 23:57:54 -0700 Subject: [PATCH 3/8] functional: add TODO --- functional/cluster_test.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/functional/cluster_test.go b/functional/cluster_test.go index 023334bb3..01f28ca4a 100644 --- a/functional/cluster_test.go +++ b/functional/cluster_test.go @@ -36,6 +36,7 @@ func testCluster(t *testing.T, size int) { c.Terminate(t) } +// TODO: use etcd client func setKey(u url.URL, key string, value string) error { u.Path = "/v2/keys" + key v := url.Values{"value": []string{value}} @@ -62,28 +63,29 @@ type cluster struct { Members []member } +// TODO: support TLS func (c *cluster) Launch(t *testing.T) { if c.Size <= 0 { t.Fatalf("cluster size <= 0") } - peerListeners := make([]net.Listener, c.Size) - peerAddrs := make([]string, c.Size) + lns := make([]net.Listener, c.Size) + btConfs := make([]string, c.Size) for i := 0; i < c.Size; i++ { l := newLocalListener(t) // each member claims only one peer listener - peerListeners[i] = l - peerAddrs[i] = fmt.Sprintf("%s=%s", c.name(i), "http://"+l.Addr().String()) + lns[i] = l + btConfs[i] = fmt.Sprintf("%s=%s", c.name(i), "http://"+l.Addr().String()) } clusterConfig := &etcdserver.Cluster{} - if err := clusterConfig.Set(strings.Join(peerAddrs, ",")); err != nil { + if err := clusterConfig.Set(strings.Join(btConfs, ",")); err != nil { t.Fatal(err) } var err error for i := 0; i < c.Size; i++ { m := member{} - m.PeerListeners = []net.Listener{peerListeners[i]} + m.PeerListeners = []net.Listener{lns[i]} cln := newLocalListener(t) m.ClientListeners = []net.Listener{cln} From 1b7947357f602ee70e6ebf8b477156cc3adc8413 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 16 Oct 2014 23:59:58 -0700 Subject: [PATCH 4/8] *: pkg functional -> integration --- {functional => integration}/cluster_test.go | 2 +- test | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename {functional => integration}/cluster_test.go (99%) diff --git a/functional/cluster_test.go b/integration/cluster_test.go similarity index 99% rename from functional/cluster_test.go rename to integration/cluster_test.go index 01f28ca4a..4dd0937da 100644 --- a/functional/cluster_test.go +++ b/integration/cluster_test.go @@ -1,4 +1,4 @@ -package functional +package integration import ( "fmt" diff --git a/test b/test index 4e275f6dc..248ddae21 100755 --- a/test +++ b/test @@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"} source ./build # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt. -TESTABLE_AND_FORMATTABLE="client etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb functional pkg pkg/flags pkg/transport proxy raft snap store wait wal" +TESTABLE_AND_FORMATTABLE="client etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb integration pkg pkg/flags pkg/transport proxy raft snap store wait wal" TESTABLE="$TESTABLE_AND_FORMATTABLE ./" FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go" From 7af679333a0b11d8054899db1874b7fca2591b2c Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 17 Oct 2014 11:25:45 -0700 Subject: [PATCH 5/8] integration: log microsecond time for integration tests --- integration/cluster_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 4dd0937da..421c7fd77 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -3,6 +3,7 @@ package integration import ( "fmt" "io/ioutil" + "log" "net" "net/http" "net/http/httptest" @@ -20,6 +21,11 @@ import ( const tickDuration = 5 * time.Millisecond +func init() { + // open microsecond-level time log for integration test debugging + log.SetFlags(log.Ltime | log.Lmicroseconds | log.Lshortfile) +} + func TestClusterOf1(t *testing.T) { testCluster(t, 1) } func TestClusterOf3(t *testing.T) { testCluster(t, 3) } From 500a72962e8b736b7cd102ef25e5eb3a7a16b160 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 17 Oct 2014 11:30:52 -0700 Subject: [PATCH 6/8] integration: clean code remove extra space, rename variables, remove unused function. --- integration/cluster_test.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 421c7fd77..13df22091 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -64,8 +64,7 @@ func setKey(u url.URL, key string, value string) error { } type cluster struct { - Size int - + Size int Members []member } @@ -76,15 +75,15 @@ func (c *cluster) Launch(t *testing.T) { } lns := make([]net.Listener, c.Size) - btConfs := make([]string, c.Size) + bootstrapCfgs := make([]string, c.Size) for i := 0; i < c.Size; i++ { l := newLocalListener(t) // each member claims only one peer listener lns[i] = l - btConfs[i] = fmt.Sprintf("%s=%s", c.name(i), "http://"+l.Addr().String()) + bootstrapCfgs[i] = fmt.Sprintf("%s=%s", c.name(i), "http://"+l.Addr().String()) } - clusterConfig := &etcdserver.Cluster{} - if err := clusterConfig.Set(strings.Join(btConfs, ",")); err != nil { + clusterCfg := &etcdserver.Cluster{} + if err := clusterCfg.Set(strings.Join(bootstrapCfgs, ",")); err != nil { t.Fatal(err) } @@ -94,7 +93,6 @@ func (c *cluster) Launch(t *testing.T) { m.PeerListeners = []net.Listener{lns[i]} cln := newLocalListener(t) m.ClientListeners = []net.Listener{cln} - m.Name = c.name(i) m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) if err != nil { @@ -104,7 +102,7 @@ func (c *cluster) Launch(t *testing.T) { if err != nil { t.Fatal(err) } - m.Cluster = clusterConfig + m.Cluster = clusterCfg m.ClusterState = etcdserver.ClusterStateValueNew m.Transport, err = transport.NewTransport(transport.TLSInfo{}) if err != nil { @@ -174,10 +172,6 @@ func (m *member) Start(t *testing.T) { panic("unimplemented") } -func (m *member) Reboot(t *testing.T) { - panic("unimplemented") -} - func (m *member) Terminate(t *testing.T) { m.s.Stop() for _, hs := range m.hss { From 80212aaf4d9209988760cd7239d75ab58b687b7a Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 17 Oct 2014 14:01:32 -0700 Subject: [PATCH 7/8] integration: retry on setKey to avoid timeout due to bootstrap --- integration/cluster_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 13df22091..1b1bc3fbe 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -34,7 +34,13 @@ func testCluster(t *testing.T, size int) { c.Launch(t) for i := 0; i < size; i++ { for _, u := range c.Members[i].ClientURLs { - if err := setKey(u, "/foo", "bar"); err != nil { + var err error + for j := 0; j < 3; j++ { + if err = setKey(u, "/foo", "bar"); err == nil { + break + } + } + if err != nil { t.Errorf("setKey on %v error: %v", u.String(), err) } } From ef44ba10cfa4d46c39191a5ab6a8a05af2e505e9 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 20 Oct 2014 14:42:11 -0700 Subject: [PATCH 8/8] integration: add doc --- integration/cluster_test.go | 5 +++++ integration/doc.go | 25 +++++++++++++++++++++++++ 2 files changed, 30 insertions(+) create mode 100644 integration/doc.go diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 1b1bc3fbe..fe1e3a0a7 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -146,6 +146,8 @@ type member struct { hss []*httptest.Server } +// Launch starts a member based on ServerConfig, PeerListeners +// and ClientListeners. func (m *member) Launch(t *testing.T) { m.s = etcdserver.NewServer(&m.ServerConfig) m.s.Ticker = time.Tick(tickDuration) @@ -170,14 +172,17 @@ func (m *member) Launch(t *testing.T) { } } +// Stop stops the member, but the data dir of the member is preserved. func (m *member) Stop(t *testing.T) { panic("unimplemented") } +// Start starts the member using preserved data dir. func (m *member) Start(t *testing.T) { panic("unimplemented") } +// Terminate stops the member and remove the data dir. func (m *member) Terminate(t *testing.T) { m.s.Stop() for _, hs := range m.hss { diff --git a/integration/doc.go b/integration/doc.go new file mode 100644 index 000000000..c410d0842 --- /dev/null +++ b/integration/doc.go @@ -0,0 +1,25 @@ +// Copyright 2014 CoreOS Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +functional tests are built upon embeded etcd, and focus on etcd functional +correctness. + +Its goal: +1. it tests the whole code base except the command line parse. +2. it is able to check internal data, including raft, store and etc. +3. it is based on goroutine, which is faster than process. +4. it mainly tests user behavior and user-facing API. +*/ +package integration