mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Switch to publishV3
This commit is contained in:
parent
aa75fd0850
commit
8c91d60a6f
@ -511,9 +511,7 @@ func (s *EtcdServer) adjustTicks() {
|
||||
func (s *EtcdServer) Start() {
|
||||
s.start()
|
||||
s.GoAttach(func() { s.adjustTicks() })
|
||||
// TODO: Switch to publishV3 in 3.6.
|
||||
// Support for cluster_member_set_attr was added in 3.5.
|
||||
s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
|
||||
s.GoAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) })
|
||||
s.GoAttach(s.purgeFile)
|
||||
s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
|
||||
s.GoAttach(s.monitorClusterVersions)
|
||||
@ -1698,69 +1696,6 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
|
||||
}
|
||||
}
|
||||
|
||||
// publish registers server information into the cluster. The information
|
||||
// is the JSON representation of this server's member struct, updated with the
|
||||
// static clientURLs of the server.
|
||||
// The function keeps attempting to register until it succeeds,
|
||||
// or its server is stopped.
|
||||
//
|
||||
// Use v2 store to encode member attributes, and apply through Raft
|
||||
// but does not go through v2 API endpoint, which means cluster can still
|
||||
// process publish requests through rafthttp
|
||||
// TODO: Remove in 3.6 (start using publishV3)
|
||||
func (s *EtcdServer) publish(timeout time.Duration) {
|
||||
lg := s.Logger()
|
||||
b, err := json.Marshal(s.attributes)
|
||||
if err != nil {
|
||||
lg.Panic("failed to marshal JSON", zap.Error(err))
|
||||
return
|
||||
}
|
||||
req := pb.Request{
|
||||
Method: "PUT",
|
||||
Path: membership.MemberAttributesStorePath(s.id),
|
||||
Val: string(b),
|
||||
}
|
||||
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(s.ctx, timeout)
|
||||
_, err := s.Do(ctx, req)
|
||||
cancel()
|
||||
switch err {
|
||||
case nil:
|
||||
close(s.readych)
|
||||
lg.Info(
|
||||
"published local member to cluster through raft",
|
||||
zap.String("local-member-id", s.ID().String()),
|
||||
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
|
||||
zap.String("request-path", req.Path),
|
||||
zap.String("cluster-id", s.cluster.ID().String()),
|
||||
zap.Duration("publish-timeout", timeout),
|
||||
)
|
||||
return
|
||||
|
||||
case ErrStopped:
|
||||
lg.Warn(
|
||||
"stopped publish because server is stopped",
|
||||
zap.String("local-member-id", s.ID().String()),
|
||||
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
|
||||
zap.Duration("publish-timeout", timeout),
|
||||
zap.Error(err),
|
||||
)
|
||||
return
|
||||
|
||||
default:
|
||||
lg.Warn(
|
||||
"failed to publish local member to cluster through raft",
|
||||
zap.String("local-member-id", s.ID().String()),
|
||||
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
|
||||
zap.String("request-path", req.Path),
|
||||
zap.Duration("publish-timeout", timeout),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
|
||||
atomic.AddInt64(&s.inflightSnapshots, 1)
|
||||
|
||||
|
@ -1473,130 +1473,6 @@ func TestUpdateMember(t *testing.T) {
|
||||
|
||||
// TODO: test server could stop itself when being removed
|
||||
|
||||
func TestPublish(t *testing.T) {
|
||||
lg := zaptest.NewLogger(t)
|
||||
n := newNodeRecorder()
|
||||
ch := make(chan interface{}, 1)
|
||||
// simulate that request has gone through consensus
|
||||
ch <- Response{}
|
||||
w := wait.NewWithResponse(ch)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: lg,
|
||||
readych: make(chan struct{}),
|
||||
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
|
||||
id: 1,
|
||||
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
|
||||
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
|
||||
cluster: &membership.RaftCluster{},
|
||||
w: w,
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
SyncTicker: &time.Ticker{},
|
||||
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
srv.publish(time.Hour)
|
||||
|
||||
action := n.Action()
|
||||
if len(action) != 1 {
|
||||
t.Fatalf("len(action) = %d, want 1", len(action))
|
||||
}
|
||||
if action[0].Name != "Propose" {
|
||||
t.Fatalf("action = %s, want Propose", action[0].Name)
|
||||
}
|
||||
data := action[0].Params[0].([]byte)
|
||||
var r pb.Request
|
||||
if err := r.Unmarshal(data); err != nil {
|
||||
t.Fatalf("unmarshal request error: %v", err)
|
||||
}
|
||||
if r.Method != "PUT" {
|
||||
t.Errorf("method = %s, want PUT", r.Method)
|
||||
}
|
||||
wm := membership.Member{ID: 1, Attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
|
||||
if wpath := membership.MemberAttributesStorePath(wm.ID); r.Path != wpath {
|
||||
t.Errorf("path = %s, want %s", r.Path, wpath)
|
||||
}
|
||||
var gattr membership.Attributes
|
||||
if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
|
||||
t.Fatalf("unmarshal val error: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(gattr, wm.Attributes) {
|
||||
t.Errorf("member = %v, want %v", gattr, wm.Attributes)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPublishStopped tests that publish will be stopped if server is stopped.
|
||||
func TestPublishStopped(t *testing.T) {
|
||||
lg := zaptest.NewLogger(t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
r := newRaftNode(raftNodeConfig{
|
||||
lg: lg,
|
||||
Node: newNodeNop(),
|
||||
transport: newNopTransporter(),
|
||||
})
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: lg,
|
||||
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
|
||||
r: *r,
|
||||
cluster: &membership.RaftCluster{},
|
||||
w: mockwait.NewNop(),
|
||||
done: make(chan struct{}),
|
||||
stopping: make(chan struct{}),
|
||||
stop: make(chan struct{}),
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
SyncTicker: &time.Ticker{},
|
||||
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
close(srv.stopping)
|
||||
srv.publish(time.Hour)
|
||||
}
|
||||
|
||||
// TestPublishRetry tests that publish will keep retry until success.
|
||||
func TestPublishRetry(t *testing.T) {
|
||||
lg := zaptest.NewLogger(t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
n := newNodeRecorderStream()
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: lg,
|
||||
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
|
||||
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
|
||||
w: mockwait.NewNop(),
|
||||
stopping: make(chan struct{}),
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
SyncTicker: &time.Ticker{},
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
// expect multiple proposals from retrying
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
defer close(ch)
|
||||
if action, err := n.Wait(2); err != nil {
|
||||
t.Errorf("len(action) = %d, want >= 2 (%v)", len(action), err)
|
||||
}
|
||||
close(srv.stopping)
|
||||
// drain remaining actions, if any, so publish can terminate
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
return
|
||||
default:
|
||||
n.Action()
|
||||
}
|
||||
}
|
||||
}()
|
||||
srv.publish(10 * time.Nanosecond)
|
||||
ch <- struct{}{}
|
||||
<-ch
|
||||
}
|
||||
|
||||
func TestPublishV3(t *testing.T) {
|
||||
n := newNodeRecorder()
|
||||
ch := make(chan interface{}, 1)
|
||||
|
Loading…
x
Reference in New Issue
Block a user