From a40a270e19fcd78925a5d18fdc2d19c940ad6a3a Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 29 Sep 2014 11:52:36 -0700 Subject: [PATCH 1/3] etcdserver: publish self info when start --- etcdserver/etcdhttp/http_test.go | 6 +++--- etcdserver/server.go | 15 +++++++++++++-- etcdserver/server_test.go | 18 +++++++++--------- main.go | 4 +++- 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 46ebfe7b4..962e91a1f 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -712,8 +712,8 @@ func (fs *errServer) Do(ctx context.Context, r etcdserverpb.Request) (etcdserver func (fs *errServer) Process(ctx context.Context, m raftpb.Message) error { return fs.err } -func (fs *errServer) Start() {} -func (fs *errServer) Stop() {} +func (fs *errServer) Start(m etcdserver.Member) {} +func (fs *errServer) Stop() {} // errReader implements io.Reader to facilitate a broken request. type errReader struct{} @@ -838,7 +838,7 @@ func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.R return rs.res, nil } func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } -func (rs *resServer) Start() {} +func (rs *resServer) Start(m etcdserver.Member) {} func (rs *resServer) Stop() {} func mustMarshalEvent(t *testing.T, ev *store.Event) string { diff --git a/etcdserver/server.go b/etcdserver/server.go index f9995004a..1052a20da 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -19,6 +19,8 @@ import ( const ( defaultSyncTimeout = time.Second DefaultSnapCount = 10000 + // TODO: calculated based on heartbeat interval + defaultPublishRetryInterval = 5 * time.Second ) var ( @@ -57,7 +59,7 @@ type Server interface { // begin serving requests. It must be called before Do or Process. // Start must be non-blocking; any long-running server functionality // should be implemented in goroutines. - Start() + Start(m Member) // Stop terminates the Server and performs any necessary finalization. // Do and Process cannot be called after Stop has been invoked. Stop() @@ -102,7 +104,16 @@ type EtcdServer struct { // Start prepares and starts server in a new goroutine. It is no longer safe to // modify a server's fields after it has been sent to Start. -func (s *EtcdServer) Start() { +// It also starts a goroutine to publish its server information. +func (s *EtcdServer) Start(m Member) { + s.start() + go s.publish(m, defaultPublishRetryInterval) +} + +// start prepares and starts server in a new goroutine. It is no longer safe to +// modify a server's fields after it has been sent to Start. +// This function is just used for testing. +func (s *EtcdServer) start() { if s.SnapCount == 0 { log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount) s.SnapCount = DefaultSnapCount diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 624f65175..22a473c9b 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -400,7 +400,7 @@ func testServer(t *testing.T, ns int64) { Storage: &storageRecorder{}, Ticker: tk.C, } - srv.Start() + srv.start() // TODO(xiangli): randomize election timeout // then remove this sleep. time.Sleep(1 * time.Millisecond) @@ -469,7 +469,7 @@ func TestDoProposal(t *testing.T) { Storage: &storageRecorder{}, Ticker: tk, } - srv.Start() + srv.start() resp, err := srv.Do(ctx, tt) srv.Stop() @@ -539,7 +539,7 @@ func TestDoProposalStopped(t *testing.T) { Storage: &storageRecorder{}, Ticker: tk, } - srv.Start() + srv.start() done := make(chan struct{}) var err error @@ -639,7 +639,7 @@ func TestSyncTrigger(t *testing.T) { Storage: &storageRecorder{}, SyncTicker: st, } - srv.Start() + srv.start() // trigger the server to become a leader and accept sync requests n.readyc <- raft.Ready{ SoftState: &raft.SoftState{ @@ -710,7 +710,7 @@ func TestTriggerSnap(t *testing.T) { SnapCount: 10, } - s.Start() + s.start() for i := 0; int64(i) < s.SnapCount; i++ { s.Do(ctx, pb.Request{Method: "PUT", ID: 1}) } @@ -741,7 +741,7 @@ func TestRecvSnapshot(t *testing.T) { Node: n, } - s.Start() + s.start() n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}} // make goroutines move forward to receive snapshot pkg.ForceGosched() @@ -769,7 +769,7 @@ func TestRecvSlowSnapshot(t *testing.T) { Node: n, } - s.Start() + s.start() n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}} // make goroutines move forward to receive snapshot pkg.ForceGosched() @@ -794,7 +794,7 @@ func TestAddNode(t *testing.T) { Send: func(_ []raftpb.Message) {}, Storage: &storageRecorder{}, } - s.Start() + s.start() s.AddNode(context.TODO(), 1, []byte("foo")) gaction := n.Action() s.Stop() @@ -814,7 +814,7 @@ func TestRemoveNode(t *testing.T) { Send: func(_ []raftpb.Message) {}, Storage: &storageRecorder{}, } - s.Start() + s.start() s.RemoveNode(context.TODO(), 1) gaction := n.Action() s.Stop() diff --git a/main.go b/main.go index 8e3fac9d9..5cd98e5df 100644 --- a/main.go +++ b/main.go @@ -205,7 +205,9 @@ func startEtcd() { SnapCount: *snapCount, ClusterStore: cls, } - s.Start() + member := *self + member.ClientURLs = *addrs + s.Start(member) ch := &pkg.CORSHandler{ Handler: etcdhttp.NewClientHandler(s, cls, *timeout), From cbc84bc70ef9e8445909521e9e1bb300e0387cb9 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 29 Sep 2014 17:14:22 -0700 Subject: [PATCH 2/3] etcdserver: minimize input info --- etcdserver/etcdhttp/http_test.go | 6 +++--- etcdserver/server.go | 9 +++++++-- main.go | 10 +++++----- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 962e91a1f..46ebfe7b4 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -712,8 +712,8 @@ func (fs *errServer) Do(ctx context.Context, r etcdserverpb.Request) (etcdserver func (fs *errServer) Process(ctx context.Context, m raftpb.Message) error { return fs.err } -func (fs *errServer) Start(m etcdserver.Member) {} -func (fs *errServer) Stop() {} +func (fs *errServer) Start() {} +func (fs *errServer) Stop() {} // errReader implements io.Reader to facilitate a broken request. type errReader struct{} @@ -838,7 +838,7 @@ func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.R return rs.res, nil } func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } -func (rs *resServer) Start(m etcdserver.Member) {} +func (rs *resServer) Start() {} func (rs *resServer) Stop() {} func mustMarshalEvent(t *testing.T, ev *store.Event) string { diff --git a/etcdserver/server.go b/etcdserver/server.go index 1052a20da..03736a0dc 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -59,7 +59,7 @@ type Server interface { // begin serving requests. It must be called before Do or Process. // Start must be non-blocking; any long-running server functionality // should be implemented in goroutines. - Start(m Member) + Start() // Stop terminates the Server and performs any necessary finalization. // Do and Process cannot be called after Stop has been invoked. Stop() @@ -80,6 +80,9 @@ type EtcdServer struct { w wait.Wait done chan struct{} + Name string + ClientURLs []string + Node raft.Node Store store.Store @@ -105,8 +108,10 @@ type EtcdServer struct { // Start prepares and starts server in a new goroutine. It is no longer safe to // modify a server's fields after it has been sent to Start. // It also starts a goroutine to publish its server information. -func (s *EtcdServer) Start(m Member) { +func (s *EtcdServer) Start() { s.start() + m := *s.ClusterStore.Get().FindName(s.Name) + m.ClientURLs = s.ClientURLs go s.publish(m, defaultPublishRetryInterval) } diff --git a/main.go b/main.go index 5cd98e5df..33bcbe668 100644 --- a/main.go +++ b/main.go @@ -193,8 +193,10 @@ func startEtcd() { cls := etcdserver.NewClusterStore(st, *cluster) s := &etcdserver.EtcdServer{ - Store: st, - Node: n, + Name: *name, + ClientURLs: *addrs, + Store: st, + Node: n, Storage: struct { *wal.WAL *snap.Snapshotter @@ -205,9 +207,7 @@ func startEtcd() { SnapCount: *snapCount, ClusterStore: cls, } - member := *self - member.ClientURLs = *addrs - s.Start(member) + s.Start() ch := &pkg.CORSHandler{ Handler: etcdhttp.NewClientHandler(s, cls, *timeout), From 073eb7677dd33a43869cc6349844acd5925621df Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 1 Oct 2014 09:33:55 -0700 Subject: [PATCH 3/3] etcdserver: move grep member logic into publish func --- etcdserver/server.go | 12 ++++++---- etcdserver/server_test.go | 50 ++++++++++++++++++++++++++------------- main.go | 2 +- 3 files changed, 42 insertions(+), 22 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 03736a0dc..ccfaef427 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -110,9 +110,7 @@ type EtcdServer struct { // It also starts a goroutine to publish its server information. func (s *EtcdServer) Start() { s.start() - m := *s.ClusterStore.Get().FindName(s.Name) - m.ClientURLs = s.ClientURLs - go s.publish(m, defaultPublishRetryInterval) + go s.publish(defaultPublishRetryInterval) } // start prepares and starts server in a new goroutine. It is no longer safe to @@ -335,10 +333,14 @@ func (s *EtcdServer) sync(timeout time.Duration) { } // publish registers server information into the cluster. The information -// is the json format of the given member. +// is the json format of its self member struct, whose ClientURLs may be +// updated. // The function keeps attempting to register until it succeeds, // or its server is stopped. -func (s *EtcdServer) publish(m Member, retryInterval time.Duration) { +// TODO: take care of info fetched from cluster store after having reconfig. +func (s *EtcdServer) publish(retryInterval time.Duration) { + m := *s.ClusterStore.Get().FindName(s.Name) + m.ClientURLs = s.ClientURLs b, err := json.Marshal(m) if err != nil { log.Printf("etcdserver: json marshal error: %v", err) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 22a473c9b..105548833 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -829,16 +829,19 @@ func TestRemoveNode(t *testing.T) { func TestPublish(t *testing.T) { n := &nodeProposeDataRecorder{} + cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}}) ch := make(chan interface{}, 1) // simulate that request has gone through consensus ch <- Response{} w := &waitWithResponse{ch: ch} srv := &EtcdServer{ - Node: n, - w: w, + Name: "node1", + ClientURLs: []string{"a", "b"}, + Node: n, + ClusterStore: cs, + w: w, } - m := Member{ID: 1, Name: "node1"} - srv.publish(m, time.Hour) + srv.publish(time.Hour) data := n.data() if len(data) != 1 { @@ -851,39 +854,46 @@ func TestPublish(t *testing.T) { if r.Method != "PUT" { t.Errorf("method = %s, want PUT", r.Method) } - if r.Path != m.storeKey() { - t.Errorf("path = %s, want %s", r.Path, m.storeKey()) + wm := Member{ID: 1, Name: "node1", ClientURLs: []string{"a", "b"}} + if r.Path != wm.storeKey() { + t.Errorf("path = %s, want %s", r.Path, wm.storeKey()) } var gm Member if err := json.Unmarshal([]byte(r.Val), &gm); err != nil { t.Fatalf("unmarshal val error: %v", err) } - if !reflect.DeepEqual(gm, m) { - t.Errorf("member = %v, want %v", gm, m) + if !reflect.DeepEqual(gm, wm) { + t.Errorf("member = %v, want %v", gm, wm) } } // TestPublishStopped tests that publish will be stopped if server is stopped. func TestPublishStopped(t *testing.T) { + cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}}) srv := &EtcdServer{ - Node: &nodeRecorder{}, - w: &waitRecorder{}, - done: make(chan struct{}), + Name: "node1", + Node: &nodeRecorder{}, + ClusterStore: cs, + w: &waitRecorder{}, + done: make(chan struct{}), } srv.Stop() - srv.publish(Member{ID: 1, Name: "node1"}, time.Hour) + srv.publish(time.Hour) } // TestPublishRetry tests that publish will keep retry until success. func TestPublishRetry(t *testing.T) { n := &nodeRecorder{} + cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}}) srv := &EtcdServer{ - Node: n, - w: &waitRecorder{}, - done: make(chan struct{}), + Name: "node1", + Node: n, + ClusterStore: cs, + w: &waitRecorder{}, + done: make(chan struct{}), } time.AfterFunc(500*time.Microsecond, srv.Stop) - srv.publish(Member{ID: 1, Name: "node1"}, 10*time.Nanosecond) + srv.publish(10 * time.Nanosecond) action := n.Action() // multiple Propose + Stop @@ -1197,3 +1207,11 @@ func (w *waitWithResponse) Register(id int64) <-chan interface{} { return w.ch } func (w *waitWithResponse) Trigger(id int64, x interface{}) {} + +func mustClusterStore(t *testing.T, membs []Member) ClusterStore { + c := Cluster{} + if err := c.AddSlice(membs); err != nil { + t.Fatalf("error creating cluster from %v: %v", membs, err) + } + return NewClusterStore(&getAllStore{}, c) +} diff --git a/main.go b/main.go index 33bcbe668..1835b73f5 100644 --- a/main.go +++ b/main.go @@ -194,7 +194,7 @@ func startEtcd() { s := &etcdserver.EtcdServer{ Name: *name, - ClientURLs: *addrs, + ClientURLs: strings.Split(acurls.String(), ","), Store: st, Node: n, Storage: struct {