diff --git a/server/etcdserver/api/etcdhttp/peer_test.go b/server/etcdserver/api/etcdhttp/peer_test.go index 8fe300cd1..3770f29ed 100644 --- a/server/etcdserver/api/etcdhttp/peer_test.go +++ b/server/etcdserver/api/etcdhttp/peer_test.go @@ -72,9 +72,10 @@ func (s *fakeServer) UpdateMember(ctx context.Context, updateMemb membership.Mem func (s *fakeServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) { return nil, fmt.Errorf("PromoteMember not implemented in fakeServer") } -func (s *fakeServer) ClusterVersion() *semver.Version { return nil } -func (s *fakeServer) Cluster() api.Cluster { return s.cluster } -func (s *fakeServer) Alarms() []*pb.AlarmMember { return nil } +func (s *fakeServer) ClusterVersion() *semver.Version { return nil } +func (s *fakeServer) Cluster() api.Cluster { return s.cluster } +func (s *fakeServer) Alarms() []*pb.AlarmMember { return nil } +func (s *fakeServer) LeaderChangedNotify() <-chan struct{} { return nil } var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("test data")) diff --git a/server/etcdserver/api/v2http/client_test.go b/server/etcdserver/api/v2http/client_test.go index bdb455c0e..cb4be0b26 100644 --- a/server/etcdserver/api/v2http/client_test.go +++ b/server/etcdserver/api/v2http/client_test.go @@ -95,11 +95,12 @@ type fakeServer struct { dummyStats } -func (s *fakeServer) Leader() types.ID { return types.ID(1) } -func (s *fakeServer) Alarms() []*etcdserverpb.AlarmMember { return nil } -func (s *fakeServer) Cluster() api.Cluster { return nil } -func (s *fakeServer) ClusterVersion() *semver.Version { return nil } -func (s *fakeServer) RaftHandler() http.Handler { return nil } +func (s *fakeServer) Leader() types.ID { return types.ID(1) } +func (s *fakeServer) Alarms() []*etcdserverpb.AlarmMember { return nil } +func (s *fakeServer) LeaderChangedNotify() <-chan struct{} { return nil } +func (s *fakeServer) Cluster() api.Cluster { return nil } +func (s *fakeServer) ClusterVersion() *semver.Version { return nil } +func (s *fakeServer) RaftHandler() http.Handler { return nil } func (s *fakeServer) Do(ctx context.Context, r etcdserverpb.Request) (rr etcdserver.Response, err error) { return } diff --git a/server/etcdserver/api/v2v3/server.go b/server/etcdserver/api/v2v3/server.go index 0cdb5c636..be1b43c52 100644 --- a/server/etcdserver/api/v2v3/server.go +++ b/server/etcdserver/api/v2v3/server.go @@ -113,9 +113,10 @@ func v3MembersToMembership(v3membs []*pb.Member) []*membership.Member { return membs } -func (s *v2v3Server) ClusterVersion() *semver.Version { return s.Version() } -func (s *v2v3Server) Cluster() api.Cluster { return s } -func (s *v2v3Server) Alarms() []*pb.AlarmMember { return nil } +func (s *v2v3Server) ClusterVersion() *semver.Version { return s.Version() } +func (s *v2v3Server) Cluster() api.Cluster { return s } +func (s *v2v3Server) Alarms() []*pb.AlarmMember { return nil } +func (s *v2v3Server) LeaderChangedNotify() <-chan struct{} { return nil } func (s *v2v3Server) Do(ctx context.Context, r pb.Request) (etcdserver.Response, error) { applier := etcdserver.NewApplierV2(s.lg, s.store, nil) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index a4fa259ae..5c5a9fb67 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -185,6 +185,15 @@ type Server interface { ClusterVersion() *semver.Version Cluster() api.Cluster Alarms() []*pb.AlarmMember + + // LeaderChangedNotify returns a channel for application level code to be notified + // when etcd leader changes, this function is intend to be used only in application + // which embed etcd. + // Caution: + // 1. the returned channel is being closed when the leadership changes. + // 2. so the new channel needs to be obtained for each raft term. + // 3. user can loose some consecutive channel changes using this API. + LeaderChangedNotify() <-chan struct{} } // EtcdServer is the production implementation of the Server interface @@ -1743,7 +1752,7 @@ func (s *EtcdServer) getLead() uint64 { return atomic.LoadUint64(&s.lead) } -func (s *EtcdServer) leaderChangedNotify() <-chan struct{} { +func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} { s.leaderChangedMu.RLock() defer s.leaderChangedMu.RUnlock() return s.leaderChanged diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 4130d8e08..3fa64f741 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -712,7 +712,7 @@ func (s *EtcdServer) linearizableReadLoop() { ctxToSend := make([]byte, 8) id1 := s.reqIDGen.Next() binary.BigEndian.PutUint64(ctxToSend, id1) - leaderChangedNotifier := s.leaderChangedNotify() + leaderChangedNotifier := s.LeaderChangedNotify() select { case <-leaderChangedNotifier: continue