Merge pull request #1318 from unihorn/164

main: add basic integration test
This commit is contained in:
Yicheng Qin 2014-10-20 14:45:22 -07:00
commit 68da8084d0
5 changed files with 241 additions and 16 deletions

View File

@ -127,6 +127,7 @@ type RaftTimer interface {
type EtcdServer struct {
w wait.Wait
done chan struct{}
stopped chan struct{}
id uint64
attributes Attributes
@ -146,8 +147,8 @@ type EtcdServer struct {
storage Storage
ticker <-chan time.Time
syncTicker <-chan time.Time
Ticker <-chan time.Time
SyncTicker <-chan time.Time
snapCount uint64 // number of entries to trigger a snapshot
@ -221,8 +222,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
stats: sstats,
lstats: lstats,
send: Sender(cfg.Transport, cls, sstats, lstats),
ticker: time.Tick(100 * time.Millisecond),
syncTicker: time.Tick(500 * time.Millisecond),
Ticker: time.Tick(100 * time.Millisecond),
SyncTicker: time.Tick(500 * time.Millisecond),
snapCount: cfg.SnapCount,
ClusterStore: cls,
}
@ -247,6 +248,7 @@ func (s *EtcdServer) start() {
}
s.w = wait.New()
s.done = make(chan struct{})
s.stopped = make(chan struct{})
s.stats.Initialize()
// TODO: if this is an empty log, writes all peer infos
// into the first entry
@ -264,14 +266,14 @@ func (s *EtcdServer) run() {
var nodes, removedNodes []uint64
for {
select {
case <-s.ticker:
case <-s.Ticker:
s.node.Tick()
case rd := <-s.node.Ready():
if rd.SoftState != nil {
nodes = rd.SoftState.Nodes
removedNodes = rd.SoftState.RemovedNodes
if rd.RaftState == raft.StateLeader {
syncC = s.syncTicker
syncC = s.SyncTicker
} else {
syncC = nil
}
@ -312,16 +314,18 @@ func (s *EtcdServer) run() {
case <-syncC:
s.sync(defaultSyncTimeout)
case <-s.done:
close(s.stopped)
return
}
}
}
// Stop stops the server, and shuts down the running goroutine. Stop should be
// called after a Start(s), otherwise it will block forever.
// Stop stops the server gracefully, and shuts down the running goroutine.
// Stop should be called after a Start(s), otherwise it will block forever.
func (s *EtcdServer) Stop() {
s.node.Stop()
close(s.done)
<-s.stopped
}
// Do interprets r and performs an operation on s.store according to r.Method

View File

@ -473,7 +473,7 @@ func testServer(t *testing.T, ns uint64) {
store: store.New(),
send: send,
storage: &storageRecorder{},
ticker: tk.C,
Ticker: tk.C,
ClusterStore: &clusterStoreRecorder{},
}
srv.start()
@ -540,7 +540,7 @@ func TestDoProposal(t *testing.T) {
store: st,
send: func(_ []raftpb.Message) {},
storage: &storageRecorder{},
ticker: tk,
Ticker: tk,
ClusterStore: &clusterStoreRecorder{},
}
srv.start()
@ -611,7 +611,7 @@ func TestDoProposalStopped(t *testing.T) {
store: st,
send: func(_ []raftpb.Message) {},
storage: &storageRecorder{},
ticker: tk,
Ticker: tk,
}
srv.start()
@ -711,7 +711,7 @@ func TestSyncTrigger(t *testing.T) {
store: &storeRecorder{},
send: func(_ []raftpb.Message) {},
storage: &storageRecorder{},
syncTicker: st,
SyncTicker: st,
}
srv.start()
// trigger the server to become a leader and accept sync requests
@ -997,10 +997,12 @@ func TestPublish(t *testing.T) {
// TestPublishStopped tests that publish will be stopped if server is stopped.
func TestPublishStopped(t *testing.T) {
srv := &EtcdServer{
node: &nodeRecorder{},
w: &waitRecorder{},
done: make(chan struct{}),
node: &nodeRecorder{},
w: &waitRecorder{},
done: make(chan struct{}),
stopped: make(chan struct{}),
}
close(srv.stopped)
srv.Stop()
srv.publish(time.Hour)
}

194
integration/cluster_test.go Normal file
View File

@ -0,0 +1,194 @@
package integration
import (
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"testing"
"time"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdhttp"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
)
const tickDuration = 5 * time.Millisecond
func init() {
// open microsecond-level time log for integration test debugging
log.SetFlags(log.Ltime | log.Lmicroseconds | log.Lshortfile)
}
func TestClusterOf1(t *testing.T) { testCluster(t, 1) }
func TestClusterOf3(t *testing.T) { testCluster(t, 3) }
func testCluster(t *testing.T, size int) {
c := &cluster{Size: size}
c.Launch(t)
for i := 0; i < size; i++ {
for _, u := range c.Members[i].ClientURLs {
var err error
for j := 0; j < 3; j++ {
if err = setKey(u, "/foo", "bar"); err == nil {
break
}
}
if err != nil {
t.Errorf("setKey on %v error: %v", u.String(), err)
}
}
}
c.Terminate(t)
}
// TODO: use etcd client
func setKey(u url.URL, key string, value string) error {
u.Path = "/v2/keys" + key
v := url.Values{"value": []string{value}}
req, err := http.NewRequest("PUT", u.String(), strings.NewReader(v.Encode()))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
ioutil.ReadAll(resp.Body)
resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return fmt.Errorf("statusCode = %d, want %d or %d", resp.StatusCode, http.StatusOK, http.StatusCreated)
}
return nil
}
type cluster struct {
Size int
Members []member
}
// TODO: support TLS
func (c *cluster) Launch(t *testing.T) {
if c.Size <= 0 {
t.Fatalf("cluster size <= 0")
}
lns := make([]net.Listener, c.Size)
bootstrapCfgs := make([]string, c.Size)
for i := 0; i < c.Size; i++ {
l := newLocalListener(t)
// each member claims only one peer listener
lns[i] = l
bootstrapCfgs[i] = fmt.Sprintf("%s=%s", c.name(i), "http://"+l.Addr().String())
}
clusterCfg := &etcdserver.Cluster{}
if err := clusterCfg.Set(strings.Join(bootstrapCfgs, ",")); err != nil {
t.Fatal(err)
}
var err error
for i := 0; i < c.Size; i++ {
m := member{}
m.PeerListeners = []net.Listener{lns[i]}
cln := newLocalListener(t)
m.ClientListeners = []net.Listener{cln}
m.Name = c.name(i)
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
if err != nil {
t.Fatal(err)
}
m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
if err != nil {
t.Fatal(err)
}
m.Cluster = clusterCfg
m.ClusterState = etcdserver.ClusterStateValueNew
m.Transport, err = transport.NewTransport(transport.TLSInfo{})
if err != nil {
t.Fatal(err)
}
m.Launch(t)
c.Members = append(c.Members, m)
}
}
func (c *cluster) Terminate(t *testing.T) {
for _, m := range c.Members {
m.Terminate(t)
}
}
func (c *cluster) name(i int) string {
return fmt.Sprint("node", i)
}
func newLocalListener(t *testing.T) net.Listener {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
return l
}
type member struct {
etcdserver.ServerConfig
PeerListeners, ClientListeners []net.Listener
s *etcdserver.EtcdServer
hss []*httptest.Server
}
// Launch starts a member based on ServerConfig, PeerListeners
// and ClientListeners.
func (m *member) Launch(t *testing.T) {
m.s = etcdserver.NewServer(&m.ServerConfig)
m.s.Ticker = time.Tick(tickDuration)
m.s.SyncTicker = nil
m.s.Start()
for _, ln := range m.PeerListeners {
hs := &httptest.Server{
Listener: ln,
Config: &http.Server{Handler: etcdhttp.NewPeerHandler(m.s)},
}
hs.Start()
m.hss = append(m.hss, hs)
}
for _, ln := range m.ClientListeners {
hs := &httptest.Server{
Listener: ln,
Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s)},
}
hs.Start()
m.hss = append(m.hss, hs)
}
}
// Stop stops the member, but the data dir of the member is preserved.
func (m *member) Stop(t *testing.T) {
panic("unimplemented")
}
// Start starts the member using preserved data dir.
func (m *member) Start(t *testing.T) {
panic("unimplemented")
}
// Terminate stops the member and remove the data dir.
func (m *member) Terminate(t *testing.T) {
m.s.Stop()
for _, hs := range m.hss {
hs.Close()
}
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
}
}

25
integration/doc.go Normal file
View File

@ -0,0 +1,25 @@
// Copyright 2014 CoreOS Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
functional tests are built upon embeded etcd, and focus on etcd functional
correctness.
Its goal:
1. it tests the whole code base except the command line parse.
2. it is able to check internal data, including raft, store and etc.
3. it is based on goroutine, which is faster than process.
4. it mainly tests user behavior and user-facing API.
*/
package integration

2
test
View File

@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"}
source ./build
# Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
TESTABLE_AND_FORMATTABLE="client etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb pkg pkg/flags pkg/transport proxy raft snap store wait wal"
TESTABLE_AND_FORMATTABLE="client etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb integration pkg pkg/flags pkg/transport proxy raft snap store wait wal"
TESTABLE="$TESTABLE_AND_FORMATTABLE ./"
FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go"