From 8c91d60a6f673dae4285d282169eff5d58fa548a Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 10 Feb 2022 19:56:29 +0100 Subject: [PATCH] server: Switch to publishV3 --- server/etcdserver/server.go | 67 +---------------- server/etcdserver/server_test.go | 124 ------------------------------- 2 files changed, 1 insertion(+), 190 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index b93158a81..8d6a430cc 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -511,9 +511,7 @@ func (s *EtcdServer) adjustTicks() { func (s *EtcdServer) Start() { s.start() s.GoAttach(func() { s.adjustTicks() }) - // TODO: Switch to publishV3 in 3.6. - // Support for cluster_member_set_attr was added in 3.5. - s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) + s.GoAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) }) s.GoAttach(s.purgeFile) s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) }) s.GoAttach(s.monitorClusterVersions) @@ -1698,69 +1696,6 @@ func (s *EtcdServer) publishV3(timeout time.Duration) { } } -// publish registers server information into the cluster. The information -// is the JSON representation of this server's member struct, updated with the -// static clientURLs of the server. -// The function keeps attempting to register until it succeeds, -// or its server is stopped. -// -// Use v2 store to encode member attributes, and apply through Raft -// but does not go through v2 API endpoint, which means cluster can still -// process publish requests through rafthttp -// TODO: Remove in 3.6 (start using publishV3) -func (s *EtcdServer) publish(timeout time.Duration) { - lg := s.Logger() - b, err := json.Marshal(s.attributes) - if err != nil { - lg.Panic("failed to marshal JSON", zap.Error(err)) - return - } - req := pb.Request{ - Method: "PUT", - Path: membership.MemberAttributesStorePath(s.id), - Val: string(b), - } - - for { - ctx, cancel := context.WithTimeout(s.ctx, timeout) - _, err := s.Do(ctx, req) - cancel() - switch err { - case nil: - close(s.readych) - lg.Info( - "published local member to cluster through raft", - zap.String("local-member-id", s.ID().String()), - zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), - zap.String("request-path", req.Path), - zap.String("cluster-id", s.cluster.ID().String()), - zap.Duration("publish-timeout", timeout), - ) - return - - case ErrStopped: - lg.Warn( - "stopped publish because server is stopped", - zap.String("local-member-id", s.ID().String()), - zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), - zap.Duration("publish-timeout", timeout), - zap.Error(err), - ) - return - - default: - lg.Warn( - "failed to publish local member to cluster through raft", - zap.String("local-member-id", s.ID().String()), - zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), - zap.String("request-path", req.Path), - zap.Duration("publish-timeout", timeout), - zap.Error(err), - ) - } - } -} - func (s *EtcdServer) sendMergedSnap(merged snap.Message) { atomic.AddInt64(&s.inflightSnapshots, 1) diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 32d368d8f..29b238cd0 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -1473,130 +1473,6 @@ func TestUpdateMember(t *testing.T) { // TODO: test server could stop itself when being removed -func TestPublish(t *testing.T) { - lg := zaptest.NewLogger(t) - n := newNodeRecorder() - ch := make(chan interface{}, 1) - // simulate that request has gone through consensus - ch <- Response{} - w := wait.NewWithResponse(ch) - ctx, cancel := context.WithCancel(context.Background()) - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: lg, - readych: make(chan struct{}), - Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - id: 1, - r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}), - attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, - cluster: &membership.RaftCluster{}, - w: w, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, - - ctx: ctx, - cancel: cancel, - } - srv.publish(time.Hour) - - action := n.Action() - if len(action) != 1 { - t.Fatalf("len(action) = %d, want 1", len(action)) - } - if action[0].Name != "Propose" { - t.Fatalf("action = %s, want Propose", action[0].Name) - } - data := action[0].Params[0].([]byte) - var r pb.Request - if err := r.Unmarshal(data); err != nil { - t.Fatalf("unmarshal request error: %v", err) - } - if r.Method != "PUT" { - t.Errorf("method = %s, want PUT", r.Method) - } - wm := membership.Member{ID: 1, Attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}} - if wpath := membership.MemberAttributesStorePath(wm.ID); r.Path != wpath { - t.Errorf("path = %s, want %s", r.Path, wpath) - } - var gattr membership.Attributes - if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil { - t.Fatalf("unmarshal val error: %v", err) - } - if !reflect.DeepEqual(gattr, wm.Attributes) { - t.Errorf("member = %v, want %v", gattr, wm.Attributes) - } -} - -// TestPublishStopped tests that publish will be stopped if server is stopped. -func TestPublishStopped(t *testing.T) { - lg := zaptest.NewLogger(t) - ctx, cancel := context.WithCancel(context.Background()) - r := newRaftNode(raftNodeConfig{ - lg: lg, - Node: newNodeNop(), - transport: newNopTransporter(), - }) - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: lg, - Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *r, - cluster: &membership.RaftCluster{}, - w: mockwait.NewNop(), - done: make(chan struct{}), - stopping: make(chan struct{}), - stop: make(chan struct{}), - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, - - ctx: ctx, - cancel: cancel, - } - close(srv.stopping) - srv.publish(time.Hour) -} - -// TestPublishRetry tests that publish will keep retry until success. -func TestPublishRetry(t *testing.T) { - lg := zaptest.NewLogger(t) - - ctx, cancel := context.WithCancel(context.Background()) - n := newNodeRecorderStream() - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: lg, - Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}), - w: mockwait.NewNop(), - stopping: make(chan struct{}), - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, - ctx: ctx, - cancel: cancel, - } - // expect multiple proposals from retrying - ch := make(chan struct{}) - go func() { - defer close(ch) - if action, err := n.Wait(2); err != nil { - t.Errorf("len(action) = %d, want >= 2 (%v)", len(action), err) - } - close(srv.stopping) - // drain remaining actions, if any, so publish can terminate - for { - select { - case <-ch: - return - default: - n.Action() - } - } - }() - srv.publish(10 * time.Nanosecond) - ch <- struct{}{} - <-ch -} - func TestPublishV3(t *testing.T) { n := newNodeRecorder() ch := make(chan interface{}, 1)