mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: export method EtcdServer.leaderChangedNotify (#12378)
This commit is contained in:
@@ -72,9 +72,10 @@ func (s *fakeServer) UpdateMember(ctx context.Context, updateMemb membership.Mem
|
|||||||
func (s *fakeServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
|
func (s *fakeServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
|
||||||
return nil, fmt.Errorf("PromoteMember not implemented in fakeServer")
|
return nil, fmt.Errorf("PromoteMember not implemented in fakeServer")
|
||||||
}
|
}
|
||||||
func (s *fakeServer) ClusterVersion() *semver.Version { return nil }
|
func (s *fakeServer) ClusterVersion() *semver.Version { return nil }
|
||||||
func (s *fakeServer) Cluster() api.Cluster { return s.cluster }
|
func (s *fakeServer) Cluster() api.Cluster { return s.cluster }
|
||||||
func (s *fakeServer) Alarms() []*pb.AlarmMember { return nil }
|
func (s *fakeServer) Alarms() []*pb.AlarmMember { return nil }
|
||||||
|
func (s *fakeServer) LeaderChangedNotify() <-chan struct{} { return nil }
|
||||||
|
|
||||||
var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Write([]byte("test data"))
|
w.Write([]byte("test data"))
|
||||||
|
|||||||
@@ -95,11 +95,12 @@ type fakeServer struct {
|
|||||||
dummyStats
|
dummyStats
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fakeServer) Leader() types.ID { return types.ID(1) }
|
func (s *fakeServer) Leader() types.ID { return types.ID(1) }
|
||||||
func (s *fakeServer) Alarms() []*etcdserverpb.AlarmMember { return nil }
|
func (s *fakeServer) Alarms() []*etcdserverpb.AlarmMember { return nil }
|
||||||
func (s *fakeServer) Cluster() api.Cluster { return nil }
|
func (s *fakeServer) LeaderChangedNotify() <-chan struct{} { return nil }
|
||||||
func (s *fakeServer) ClusterVersion() *semver.Version { return nil }
|
func (s *fakeServer) Cluster() api.Cluster { return nil }
|
||||||
func (s *fakeServer) RaftHandler() http.Handler { return nil }
|
func (s *fakeServer) ClusterVersion() *semver.Version { return nil }
|
||||||
|
func (s *fakeServer) RaftHandler() http.Handler { return nil }
|
||||||
func (s *fakeServer) Do(ctx context.Context, r etcdserverpb.Request) (rr etcdserver.Response, err error) {
|
func (s *fakeServer) Do(ctx context.Context, r etcdserverpb.Request) (rr etcdserver.Response, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -113,9 +113,10 @@ func v3MembersToMembership(v3membs []*pb.Member) []*membership.Member {
|
|||||||
return membs
|
return membs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *v2v3Server) ClusterVersion() *semver.Version { return s.Version() }
|
func (s *v2v3Server) ClusterVersion() *semver.Version { return s.Version() }
|
||||||
func (s *v2v3Server) Cluster() api.Cluster { return s }
|
func (s *v2v3Server) Cluster() api.Cluster { return s }
|
||||||
func (s *v2v3Server) Alarms() []*pb.AlarmMember { return nil }
|
func (s *v2v3Server) Alarms() []*pb.AlarmMember { return nil }
|
||||||
|
func (s *v2v3Server) LeaderChangedNotify() <-chan struct{} { return nil }
|
||||||
|
|
||||||
func (s *v2v3Server) Do(ctx context.Context, r pb.Request) (etcdserver.Response, error) {
|
func (s *v2v3Server) Do(ctx context.Context, r pb.Request) (etcdserver.Response, error) {
|
||||||
applier := etcdserver.NewApplierV2(s.lg, s.store, nil)
|
applier := etcdserver.NewApplierV2(s.lg, s.store, nil)
|
||||||
|
|||||||
@@ -185,6 +185,15 @@ type Server interface {
|
|||||||
ClusterVersion() *semver.Version
|
ClusterVersion() *semver.Version
|
||||||
Cluster() api.Cluster
|
Cluster() api.Cluster
|
||||||
Alarms() []*pb.AlarmMember
|
Alarms() []*pb.AlarmMember
|
||||||
|
|
||||||
|
// LeaderChangedNotify returns a channel for application level code to be notified
|
||||||
|
// when etcd leader changes, this function is intend to be used only in application
|
||||||
|
// which embed etcd.
|
||||||
|
// Caution:
|
||||||
|
// 1. the returned channel is being closed when the leadership changes.
|
||||||
|
// 2. so the new channel needs to be obtained for each raft term.
|
||||||
|
// 3. user can loose some consecutive channel changes using this API.
|
||||||
|
LeaderChangedNotify() <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// EtcdServer is the production implementation of the Server interface
|
// EtcdServer is the production implementation of the Server interface
|
||||||
@@ -1743,7 +1752,7 @@ func (s *EtcdServer) getLead() uint64 {
|
|||||||
return atomic.LoadUint64(&s.lead)
|
return atomic.LoadUint64(&s.lead)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) leaderChangedNotify() <-chan struct{} {
|
func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} {
|
||||||
s.leaderChangedMu.RLock()
|
s.leaderChangedMu.RLock()
|
||||||
defer s.leaderChangedMu.RUnlock()
|
defer s.leaderChangedMu.RUnlock()
|
||||||
return s.leaderChanged
|
return s.leaderChanged
|
||||||
|
|||||||
@@ -712,7 +712,7 @@ func (s *EtcdServer) linearizableReadLoop() {
|
|||||||
ctxToSend := make([]byte, 8)
|
ctxToSend := make([]byte, 8)
|
||||||
id1 := s.reqIDGen.Next()
|
id1 := s.reqIDGen.Next()
|
||||||
binary.BigEndian.PutUint64(ctxToSend, id1)
|
binary.BigEndian.PutUint64(ctxToSend, id1)
|
||||||
leaderChangedNotifier := s.leaderChangedNotify()
|
leaderChangedNotifier := s.LeaderChangedNotify()
|
||||||
select {
|
select {
|
||||||
case <-leaderChangedNotifier:
|
case <-leaderChangedNotifier:
|
||||||
continue
|
continue
|
||||||
|
|||||||
Reference in New Issue
Block a user