mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #775 from unihorn/84
refactor(tests/server_utils): use etcd instance
This commit is contained in:
commit
af33d61774
@ -142,16 +142,6 @@ func (c *Config) Load(arguments []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Sanitize all the input fields.
|
||||
if err := c.Sanitize(); err != nil {
|
||||
return fmt.Errorf("sanitize: %v", err)
|
||||
}
|
||||
|
||||
// Force remove server configuration if specified.
|
||||
if c.Force {
|
||||
c.Reset()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -479,7 +479,7 @@ func TestConfigCustomConfigOverrideSystemConfig(t *testing.T) {
|
||||
c := New()
|
||||
c.SystemPath = p1
|
||||
assert.Nil(t, c.Load([]string{"-config", p2}), "")
|
||||
assert.Equal(t, c.Addr, "http://127.0.0.1:6000", "")
|
||||
assert.Equal(t, c.Addr, "127.0.0.1:6000", "")
|
||||
})
|
||||
})
|
||||
}
|
||||
@ -494,7 +494,7 @@ func TestConfigEnvVarOverrideCustomConfig(t *testing.T) {
|
||||
c := New()
|
||||
c.SystemPath = ""
|
||||
assert.Nil(t, c.Load([]string{"-config", path}), "")
|
||||
assert.Equal(t, c.Peer.Addr, "http://127.0.0.1:8000", "")
|
||||
assert.Equal(t, c.Peer.Addr, "127.0.0.1:8000", "")
|
||||
})
|
||||
}
|
||||
|
||||
@ -506,7 +506,7 @@ func TestConfigCLIArgsOverrideEnvVar(t *testing.T) {
|
||||
c := New()
|
||||
c.SystemPath = ""
|
||||
assert.Nil(t, c.Load([]string{"-addr", "127.0.0.1:2000"}), "")
|
||||
assert.Equal(t, c.Addr, "http://127.0.0.1:2000", "")
|
||||
assert.Equal(t, c.Addr, "127.0.0.1:2000", "")
|
||||
}
|
||||
|
||||
//--------------------------------------
|
||||
|
10
etcd/etcd.go
10
etcd/etcd.go
@ -61,6 +61,16 @@ func New(c *config.Config) *Etcd {
|
||||
|
||||
// Run the etcd instance.
|
||||
func (e *Etcd) Run() {
|
||||
// Sanitize all the input fields.
|
||||
if err := e.Config.Sanitize(); err != nil {
|
||||
log.Fatalf("failed sanitizing configuration: %v", err)
|
||||
}
|
||||
|
||||
// Force remove server configuration if specified.
|
||||
if e.Config.Force {
|
||||
e.Config.Reset()
|
||||
}
|
||||
|
||||
// Enable options.
|
||||
if e.Config.VeryVeryVerbose {
|
||||
log.Verbose = true
|
||||
|
@ -34,10 +34,6 @@ func TestRunStop(t *testing.T) {
|
||||
config.Addr = "localhost:0"
|
||||
config.Peer.Addr = "localhost:0"
|
||||
|
||||
if err := config.Sanitize(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
etcd := New(config)
|
||||
go etcd.Run()
|
||||
<-etcd.ReadyNotify()
|
||||
|
@ -59,6 +59,10 @@ func (h *CORSHandler) addHeader(w http.ResponseWriter, origin string) {
|
||||
// ServeHTTP adds the correct CORS headers based on the origin and returns immediately
|
||||
// with a 200 OK if the method is OPTIONS.
|
||||
func (h *CORSHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
// It is important to flush before leaving the goroutine.
|
||||
// Or it may miss the latest info written.
|
||||
defer w.(http.Flusher).Flush()
|
||||
|
||||
// Write CORS header.
|
||||
if h.Info.OriginAllowed("*") {
|
||||
h.addHeader(w, "*")
|
||||
|
@ -1,17 +1,9 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
|
||||
|
||||
"github.com/coreos/etcd/metrics"
|
||||
"github.com/coreos/etcd/config"
|
||||
"github.com/coreos/etcd/etcd"
|
||||
"github.com/coreos/etcd/server"
|
||||
"github.com/coreos/etcd/store"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -19,94 +11,30 @@ const (
|
||||
testClientURL = "localhost:4401"
|
||||
testRaftURL = "localhost:7701"
|
||||
testSnapshotCount = 10000
|
||||
testHeartbeatInterval = time.Duration(50) * time.Millisecond
|
||||
testElectionTimeout = time.Duration(200) * time.Millisecond
|
||||
testHeartbeatInterval = 50
|
||||
testElectionTimeout = 200
|
||||
testDataDir = "/tmp/ETCDTEST"
|
||||
)
|
||||
|
||||
// Starts a server in a temporary directory.
|
||||
// Starts a new server.
|
||||
func RunServer(f func(*server.Server)) {
|
||||
path, _ := ioutil.TempDir("", "etcd-")
|
||||
defer os.RemoveAll(path)
|
||||
c := config.New()
|
||||
|
||||
store := store.New()
|
||||
registry := server.NewRegistry(store)
|
||||
c.Name = testName
|
||||
c.Addr = testClientURL
|
||||
c.Peer.Addr = testRaftURL
|
||||
|
||||
serverStats := server.NewRaftServerStats(testName)
|
||||
followersStats := server.NewRaftFollowersStats(testName)
|
||||
c.DataDir = testDataDir
|
||||
c.Force = true
|
||||
|
||||
psConfig := server.PeerServerConfig{
|
||||
Name: testName,
|
||||
URL: "http://" + testRaftURL,
|
||||
Scheme: "http",
|
||||
SnapshotCount: testSnapshotCount,
|
||||
}
|
||||
|
||||
mb := metrics.NewBucket("")
|
||||
|
||||
ps := server.NewPeerServer(psConfig, registry, store, &mb, followersStats, serverStats)
|
||||
psListener := server.NewListener("http", testRaftURL, nil)
|
||||
|
||||
// Create Raft transporter and server
|
||||
dialTimeout := (3 * testHeartbeatInterval) + testElectionTimeout
|
||||
responseHeaderTimeout := (3 * testHeartbeatInterval) + testElectionTimeout
|
||||
raftTransporter := server.NewTransporter(followersStats, serverStats, registry, testHeartbeatInterval, dialTimeout, responseHeaderTimeout)
|
||||
raftServer, err := raft.NewServer(testName, path, raftTransporter, store, ps, "")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
raftServer.SetElectionTimeout(testElectionTimeout)
|
||||
raftServer.SetHeartbeatInterval(testHeartbeatInterval)
|
||||
ps.SetRaftServer(raftServer)
|
||||
|
||||
s := server.New(testName, "http://"+testClientURL, ps, registry, store, nil)
|
||||
sListener := server.NewListener("http", testClientURL, nil)
|
||||
|
||||
ps.SetServer(s)
|
||||
|
||||
w := &sync.WaitGroup{}
|
||||
|
||||
// Start up peer server.
|
||||
c := make(chan bool)
|
||||
go func() {
|
||||
c <- true
|
||||
ps.Start(false, "", []string{})
|
||||
h := waitHandler{w, ps.HTTPHandler()}
|
||||
http.Serve(psListener, &h)
|
||||
}()
|
||||
<-c
|
||||
|
||||
// Start up etcd server.
|
||||
go func() {
|
||||
c <- true
|
||||
h := waitHandler{w, s.HTTPHandler()}
|
||||
http.Serve(sListener, &h)
|
||||
}()
|
||||
<-c
|
||||
|
||||
// Wait to make sure servers have started.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
c.Peer.HeartbeatInterval = testHeartbeatInterval
|
||||
c.Peer.ElectionTimeout = testElectionTimeout
|
||||
c.SnapshotCount = testSnapshotCount
|
||||
|
||||
i := etcd.New(c)
|
||||
go i.Run()
|
||||
<-i.ReadyNotify()
|
||||
// Execute the function passed in.
|
||||
f(s)
|
||||
|
||||
// Clean up servers.
|
||||
ps.Stop()
|
||||
psListener.Close()
|
||||
sListener.Close()
|
||||
w.Wait()
|
||||
}
|
||||
|
||||
type waitHandler struct {
|
||||
wg *sync.WaitGroup
|
||||
handler http.Handler
|
||||
}
|
||||
|
||||
func (h *waitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
h.wg.Add(1)
|
||||
defer h.wg.Done()
|
||||
h.handler.ServeHTTP(w, r)
|
||||
|
||||
//important to flush before decrementing the wait group.
|
||||
//we won't get a chance to once main() ends.
|
||||
w.(http.Flusher).Flush()
|
||||
f(i.Server)
|
||||
i.Stop()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user