Merge pull request #1201 from unihorn/143

etcdserver: publish self info when start
This commit is contained in:
Yicheng Qin 2014-10-01 11:56:20 -07:00
commit f84b5b1071
3 changed files with 67 additions and 29 deletions

View File

@ -19,6 +19,8 @@ import (
const ( const (
defaultSyncTimeout = time.Second defaultSyncTimeout = time.Second
DefaultSnapCount = 10000 DefaultSnapCount = 10000
// TODO: calculated based on heartbeat interval
defaultPublishRetryInterval = 5 * time.Second
) )
var ( var (
@ -78,6 +80,9 @@ type EtcdServer struct {
w wait.Wait w wait.Wait
done chan struct{} done chan struct{}
Name string
ClientURLs []string
Node raft.Node Node raft.Node
Store store.Store Store store.Store
@ -102,7 +107,16 @@ type EtcdServer struct {
// Start prepares and starts server in a new goroutine. It is no longer safe to // Start prepares and starts server in a new goroutine. It is no longer safe to
// modify a server's fields after it has been sent to Start. // modify a server's fields after it has been sent to Start.
// It also starts a goroutine to publish its server information.
func (s *EtcdServer) Start() { func (s *EtcdServer) Start() {
s.start()
go s.publish(defaultPublishRetryInterval)
}
// start prepares and starts server in a new goroutine. It is no longer safe to
// modify a server's fields after it has been sent to Start.
// This function is just used for testing.
func (s *EtcdServer) start() {
if s.SnapCount == 0 { if s.SnapCount == 0 {
log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount) log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount)
s.SnapCount = DefaultSnapCount s.SnapCount = DefaultSnapCount
@ -319,10 +333,14 @@ func (s *EtcdServer) sync(timeout time.Duration) {
} }
// publish registers server information into the cluster. The information // publish registers server information into the cluster. The information
// is the json format of the given member. // is the json format of its self member struct, whose ClientURLs may be
// updated.
// The function keeps attempting to register until it succeeds, // The function keeps attempting to register until it succeeds,
// or its server is stopped. // or its server is stopped.
func (s *EtcdServer) publish(m Member, retryInterval time.Duration) { // TODO: take care of info fetched from cluster store after having reconfig.
func (s *EtcdServer) publish(retryInterval time.Duration) {
m := *s.ClusterStore.Get().FindName(s.Name)
m.ClientURLs = s.ClientURLs
b, err := json.Marshal(m) b, err := json.Marshal(m)
if err != nil { if err != nil {
log.Printf("etcdserver: json marshal error: %v", err) log.Printf("etcdserver: json marshal error: %v", err)

View File

@ -400,7 +400,7 @@ func testServer(t *testing.T, ns int64) {
Storage: &storageRecorder{}, Storage: &storageRecorder{},
Ticker: tk.C, Ticker: tk.C,
} }
srv.Start() srv.start()
// TODO(xiangli): randomize election timeout // TODO(xiangli): randomize election timeout
// then remove this sleep. // then remove this sleep.
time.Sleep(1 * time.Millisecond) time.Sleep(1 * time.Millisecond)
@ -469,7 +469,7 @@ func TestDoProposal(t *testing.T) {
Storage: &storageRecorder{}, Storage: &storageRecorder{},
Ticker: tk, Ticker: tk,
} }
srv.Start() srv.start()
resp, err := srv.Do(ctx, tt) resp, err := srv.Do(ctx, tt)
srv.Stop() srv.Stop()
@ -539,7 +539,7 @@ func TestDoProposalStopped(t *testing.T) {
Storage: &storageRecorder{}, Storage: &storageRecorder{},
Ticker: tk, Ticker: tk,
} }
srv.Start() srv.start()
done := make(chan struct{}) done := make(chan struct{})
var err error var err error
@ -639,7 +639,7 @@ func TestSyncTrigger(t *testing.T) {
Storage: &storageRecorder{}, Storage: &storageRecorder{},
SyncTicker: st, SyncTicker: st,
} }
srv.Start() srv.start()
// trigger the server to become a leader and accept sync requests // trigger the server to become a leader and accept sync requests
n.readyc <- raft.Ready{ n.readyc <- raft.Ready{
SoftState: &raft.SoftState{ SoftState: &raft.SoftState{
@ -710,7 +710,7 @@ func TestTriggerSnap(t *testing.T) {
SnapCount: 10, SnapCount: 10,
} }
s.Start() s.start()
for i := 0; int64(i) < s.SnapCount; i++ { for i := 0; int64(i) < s.SnapCount; i++ {
s.Do(ctx, pb.Request{Method: "PUT", ID: 1}) s.Do(ctx, pb.Request{Method: "PUT", ID: 1})
} }
@ -741,7 +741,7 @@ func TestRecvSnapshot(t *testing.T) {
Node: n, Node: n,
} }
s.Start() s.start()
n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}} n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}}
// make goroutines move forward to receive snapshot // make goroutines move forward to receive snapshot
pkg.ForceGosched() pkg.ForceGosched()
@ -769,7 +769,7 @@ func TestRecvSlowSnapshot(t *testing.T) {
Node: n, Node: n,
} }
s.Start() s.start()
n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}} n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}}
// make goroutines move forward to receive snapshot // make goroutines move forward to receive snapshot
pkg.ForceGosched() pkg.ForceGosched()
@ -794,7 +794,7 @@ func TestAddNode(t *testing.T) {
Send: func(_ []raftpb.Message) {}, Send: func(_ []raftpb.Message) {},
Storage: &storageRecorder{}, Storage: &storageRecorder{},
} }
s.Start() s.start()
s.AddNode(context.TODO(), 1, []byte("foo")) s.AddNode(context.TODO(), 1, []byte("foo"))
gaction := n.Action() gaction := n.Action()
s.Stop() s.Stop()
@ -814,7 +814,7 @@ func TestRemoveNode(t *testing.T) {
Send: func(_ []raftpb.Message) {}, Send: func(_ []raftpb.Message) {},
Storage: &storageRecorder{}, Storage: &storageRecorder{},
} }
s.Start() s.start()
s.RemoveNode(context.TODO(), 1) s.RemoveNode(context.TODO(), 1)
gaction := n.Action() gaction := n.Action()
s.Stop() s.Stop()
@ -829,16 +829,19 @@ func TestRemoveNode(t *testing.T) {
func TestPublish(t *testing.T) { func TestPublish(t *testing.T) {
n := &nodeProposeDataRecorder{} n := &nodeProposeDataRecorder{}
cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
ch := make(chan interface{}, 1) ch := make(chan interface{}, 1)
// simulate that request has gone through consensus // simulate that request has gone through consensus
ch <- Response{} ch <- Response{}
w := &waitWithResponse{ch: ch} w := &waitWithResponse{ch: ch}
srv := &EtcdServer{ srv := &EtcdServer{
Node: n, Name: "node1",
w: w, ClientURLs: []string{"a", "b"},
Node: n,
ClusterStore: cs,
w: w,
} }
m := Member{ID: 1, Name: "node1"} srv.publish(time.Hour)
srv.publish(m, time.Hour)
data := n.data() data := n.data()
if len(data) != 1 { if len(data) != 1 {
@ -851,39 +854,46 @@ func TestPublish(t *testing.T) {
if r.Method != "PUT" { if r.Method != "PUT" {
t.Errorf("method = %s, want PUT", r.Method) t.Errorf("method = %s, want PUT", r.Method)
} }
if r.Path != m.storeKey() { wm := Member{ID: 1, Name: "node1", ClientURLs: []string{"a", "b"}}
t.Errorf("path = %s, want %s", r.Path, m.storeKey()) if r.Path != wm.storeKey() {
t.Errorf("path = %s, want %s", r.Path, wm.storeKey())
} }
var gm Member var gm Member
if err := json.Unmarshal([]byte(r.Val), &gm); err != nil { if err := json.Unmarshal([]byte(r.Val), &gm); err != nil {
t.Fatalf("unmarshal val error: %v", err) t.Fatalf("unmarshal val error: %v", err)
} }
if !reflect.DeepEqual(gm, m) { if !reflect.DeepEqual(gm, wm) {
t.Errorf("member = %v, want %v", gm, m) t.Errorf("member = %v, want %v", gm, wm)
} }
} }
// TestPublishStopped tests that publish will be stopped if server is stopped. // TestPublishStopped tests that publish will be stopped if server is stopped.
func TestPublishStopped(t *testing.T) { func TestPublishStopped(t *testing.T) {
cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
srv := &EtcdServer{ srv := &EtcdServer{
Node: &nodeRecorder{}, Name: "node1",
w: &waitRecorder{}, Node: &nodeRecorder{},
done: make(chan struct{}), ClusterStore: cs,
w: &waitRecorder{},
done: make(chan struct{}),
} }
srv.Stop() srv.Stop()
srv.publish(Member{ID: 1, Name: "node1"}, time.Hour) srv.publish(time.Hour)
} }
// TestPublishRetry tests that publish will keep retry until success. // TestPublishRetry tests that publish will keep retry until success.
func TestPublishRetry(t *testing.T) { func TestPublishRetry(t *testing.T) {
n := &nodeRecorder{} n := &nodeRecorder{}
cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
srv := &EtcdServer{ srv := &EtcdServer{
Node: n, Name: "node1",
w: &waitRecorder{}, Node: n,
done: make(chan struct{}), ClusterStore: cs,
w: &waitRecorder{},
done: make(chan struct{}),
} }
time.AfterFunc(500*time.Microsecond, srv.Stop) time.AfterFunc(500*time.Microsecond, srv.Stop)
srv.publish(Member{ID: 1, Name: "node1"}, 10*time.Nanosecond) srv.publish(10 * time.Nanosecond)
action := n.Action() action := n.Action()
// multiple Propose + Stop // multiple Propose + Stop
@ -1197,3 +1207,11 @@ func (w *waitWithResponse) Register(id int64) <-chan interface{} {
return w.ch return w.ch
} }
func (w *waitWithResponse) Trigger(id int64, x interface{}) {} func (w *waitWithResponse) Trigger(id int64, x interface{}) {}
func mustClusterStore(t *testing.T, membs []Member) ClusterStore {
c := Cluster{}
if err := c.AddSlice(membs); err != nil {
t.Fatalf("error creating cluster from %v: %v", membs, err)
}
return NewClusterStore(&getAllStore{}, c)
}

View File

@ -193,8 +193,10 @@ func startEtcd() {
cls := etcdserver.NewClusterStore(st, *cluster) cls := etcdserver.NewClusterStore(st, *cluster)
s := &etcdserver.EtcdServer{ s := &etcdserver.EtcdServer{
Store: st, Name: *name,
Node: n, ClientURLs: strings.Split(acurls.String(), ","),
Store: st,
Node: n,
Storage: struct { Storage: struct {
*wal.WAL *wal.WAL
*snap.Snapshotter *snap.Snapshotter