diff --git a/etcdserver/server.go b/etcdserver/server.go index f9995004a..ccfaef427 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -19,6 +19,8 @@ import ( const ( defaultSyncTimeout = time.Second DefaultSnapCount = 10000 + // TODO: calculated based on heartbeat interval + defaultPublishRetryInterval = 5 * time.Second ) var ( @@ -78,6 +80,9 @@ type EtcdServer struct { w wait.Wait done chan struct{} + Name string + ClientURLs []string + Node raft.Node Store store.Store @@ -102,7 +107,16 @@ type EtcdServer struct { // Start prepares and starts server in a new goroutine. It is no longer safe to // modify a server's fields after it has been sent to Start. +// It also starts a goroutine to publish its server information. func (s *EtcdServer) Start() { + s.start() + go s.publish(defaultPublishRetryInterval) +} + +// start prepares and starts server in a new goroutine. It is no longer safe to +// modify a server's fields after it has been sent to Start. +// This function is just used for testing. +func (s *EtcdServer) start() { if s.SnapCount == 0 { log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount) s.SnapCount = DefaultSnapCount @@ -319,10 +333,14 @@ func (s *EtcdServer) sync(timeout time.Duration) { } // publish registers server information into the cluster. The information -// is the json format of the given member. +// is the json format of its self member struct, whose ClientURLs may be +// updated. // The function keeps attempting to register until it succeeds, // or its server is stopped. -func (s *EtcdServer) publish(m Member, retryInterval time.Duration) { +// TODO: take care of info fetched from cluster store after having reconfig. +func (s *EtcdServer) publish(retryInterval time.Duration) { + m := *s.ClusterStore.Get().FindName(s.Name) + m.ClientURLs = s.ClientURLs b, err := json.Marshal(m) if err != nil { log.Printf("etcdserver: json marshal error: %v", err) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 624f65175..105548833 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -400,7 +400,7 @@ func testServer(t *testing.T, ns int64) { Storage: &storageRecorder{}, Ticker: tk.C, } - srv.Start() + srv.start() // TODO(xiangli): randomize election timeout // then remove this sleep. time.Sleep(1 * time.Millisecond) @@ -469,7 +469,7 @@ func TestDoProposal(t *testing.T) { Storage: &storageRecorder{}, Ticker: tk, } - srv.Start() + srv.start() resp, err := srv.Do(ctx, tt) srv.Stop() @@ -539,7 +539,7 @@ func TestDoProposalStopped(t *testing.T) { Storage: &storageRecorder{}, Ticker: tk, } - srv.Start() + srv.start() done := make(chan struct{}) var err error @@ -639,7 +639,7 @@ func TestSyncTrigger(t *testing.T) { Storage: &storageRecorder{}, SyncTicker: st, } - srv.Start() + srv.start() // trigger the server to become a leader and accept sync requests n.readyc <- raft.Ready{ SoftState: &raft.SoftState{ @@ -710,7 +710,7 @@ func TestTriggerSnap(t *testing.T) { SnapCount: 10, } - s.Start() + s.start() for i := 0; int64(i) < s.SnapCount; i++ { s.Do(ctx, pb.Request{Method: "PUT", ID: 1}) } @@ -741,7 +741,7 @@ func TestRecvSnapshot(t *testing.T) { Node: n, } - s.Start() + s.start() n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}} // make goroutines move forward to receive snapshot pkg.ForceGosched() @@ -769,7 +769,7 @@ func TestRecvSlowSnapshot(t *testing.T) { Node: n, } - s.Start() + s.start() n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}} // make goroutines move forward to receive snapshot pkg.ForceGosched() @@ -794,7 +794,7 @@ func TestAddNode(t *testing.T) { Send: func(_ []raftpb.Message) {}, Storage: &storageRecorder{}, } - s.Start() + s.start() s.AddNode(context.TODO(), 1, []byte("foo")) gaction := n.Action() s.Stop() @@ -814,7 +814,7 @@ func TestRemoveNode(t *testing.T) { Send: func(_ []raftpb.Message) {}, Storage: &storageRecorder{}, } - s.Start() + s.start() s.RemoveNode(context.TODO(), 1) gaction := n.Action() s.Stop() @@ -829,16 +829,19 @@ func TestRemoveNode(t *testing.T) { func TestPublish(t *testing.T) { n := &nodeProposeDataRecorder{} + cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}}) ch := make(chan interface{}, 1) // simulate that request has gone through consensus ch <- Response{} w := &waitWithResponse{ch: ch} srv := &EtcdServer{ - Node: n, - w: w, + Name: "node1", + ClientURLs: []string{"a", "b"}, + Node: n, + ClusterStore: cs, + w: w, } - m := Member{ID: 1, Name: "node1"} - srv.publish(m, time.Hour) + srv.publish(time.Hour) data := n.data() if len(data) != 1 { @@ -851,39 +854,46 @@ func TestPublish(t *testing.T) { if r.Method != "PUT" { t.Errorf("method = %s, want PUT", r.Method) } - if r.Path != m.storeKey() { - t.Errorf("path = %s, want %s", r.Path, m.storeKey()) + wm := Member{ID: 1, Name: "node1", ClientURLs: []string{"a", "b"}} + if r.Path != wm.storeKey() { + t.Errorf("path = %s, want %s", r.Path, wm.storeKey()) } var gm Member if err := json.Unmarshal([]byte(r.Val), &gm); err != nil { t.Fatalf("unmarshal val error: %v", err) } - if !reflect.DeepEqual(gm, m) { - t.Errorf("member = %v, want %v", gm, m) + if !reflect.DeepEqual(gm, wm) { + t.Errorf("member = %v, want %v", gm, wm) } } // TestPublishStopped tests that publish will be stopped if server is stopped. func TestPublishStopped(t *testing.T) { + cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}}) srv := &EtcdServer{ - Node: &nodeRecorder{}, - w: &waitRecorder{}, - done: make(chan struct{}), + Name: "node1", + Node: &nodeRecorder{}, + ClusterStore: cs, + w: &waitRecorder{}, + done: make(chan struct{}), } srv.Stop() - srv.publish(Member{ID: 1, Name: "node1"}, time.Hour) + srv.publish(time.Hour) } // TestPublishRetry tests that publish will keep retry until success. func TestPublishRetry(t *testing.T) { n := &nodeRecorder{} + cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}}) srv := &EtcdServer{ - Node: n, - w: &waitRecorder{}, - done: make(chan struct{}), + Name: "node1", + Node: n, + ClusterStore: cs, + w: &waitRecorder{}, + done: make(chan struct{}), } time.AfterFunc(500*time.Microsecond, srv.Stop) - srv.publish(Member{ID: 1, Name: "node1"}, 10*time.Nanosecond) + srv.publish(10 * time.Nanosecond) action := n.Action() // multiple Propose + Stop @@ -1197,3 +1207,11 @@ func (w *waitWithResponse) Register(id int64) <-chan interface{} { return w.ch } func (w *waitWithResponse) Trigger(id int64, x interface{}) {} + +func mustClusterStore(t *testing.T, membs []Member) ClusterStore { + c := Cluster{} + if err := c.AddSlice(membs); err != nil { + t.Fatalf("error creating cluster from %v: %v", membs, err) + } + return NewClusterStore(&getAllStore{}, c) +} diff --git a/main.go b/main.go index 8e3fac9d9..1835b73f5 100644 --- a/main.go +++ b/main.go @@ -193,8 +193,10 @@ func startEtcd() { cls := etcdserver.NewClusterStore(st, *cluster) s := &etcdserver.EtcdServer{ - Store: st, - Node: n, + Name: *name, + ClientURLs: strings.Split(acurls.String(), ","), + Store: st, + Node: n, Storage: struct { *wal.WAL *snap.Snapshotter