Merge get_logger() & Logger() method.

This commit is contained in:
Piotr Tabor
2021-03-14 14:05:17 +01:00
parent 527c765ece
commit 44bd22307e
7 changed files with 56 additions and 60 deletions

View File

@@ -539,7 +539,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(
srv.getLogger(),
srv.Logger(),
srv.be,
lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
@@ -559,7 +559,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
return nil, err
}
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
kvindex := srv.consistIndex.ConsistentIndex()
srv.lg.Debug("restore consistentIndex",
zap.Uint64("index", kvindex))
@@ -577,7 +577,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
}
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, srv.consistIndex, tp, int(cfg.BcryptCost))
srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, srv.consistIndex, tp, int(cfg.BcryptCost))
newSrv := srv // since srv == nil in defer if srv is returned as nil
defer func() {
@@ -641,7 +641,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
return srv, nil
}
func (s *EtcdServer) getLogger() *zap.Logger {
func (s *EtcdServer) Logger() *zap.Logger {
s.lgMu.RLock()
l := s.lg
s.lgMu.RUnlock()
@@ -653,7 +653,7 @@ func tickToDur(ticks int, tickMs uint) string {
}
func (s *EtcdServer) adjustTicks() {
lg := s.getLogger()
lg := s.Logger()
clusterN := len(s.cluster.Members())
// single-node fresh start, or single-node recovers from snapshot
@@ -723,7 +723,7 @@ func (s *EtcdServer) Start() {
s.GoAttach(func() { s.adjustTicks() })
s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.GoAttach(s.purgeFile)
s.GoAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) })
s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
s.GoAttach(s.monitorVersions)
s.GoAttach(s.linearizableReadLoop)
s.GoAttach(s.monitorKVHash)
@@ -734,7 +734,7 @@ func (s *EtcdServer) Start() {
// modify a server's fields after it has been sent to Start.
// This function is just used for testing.
func (s *EtcdServer) start() {
lg := s.getLogger()
lg := s.Logger()
if s.Cfg.SnapshotCount == 0 {
lg.Info(
@@ -786,7 +786,7 @@ func (s *EtcdServer) start() {
}
func (s *EtcdServer) purgeFile() {
lg := s.getLogger()
lg := s.Logger()
var dberrc, serrc, werrc <-chan error
var dbdonec, sdonec, wdonec <-chan struct{}
if s.Cfg.MaxSnapFiles > 0 {
@@ -853,7 +853,7 @@ type downgradeEnabledHandler struct {
func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
return &downgradeEnabledHandler{
lg: s.getLogger(),
lg: s.Logger(),
cluster: s.cluster,
server: s,
}
@@ -890,7 +890,7 @@ func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Reque
// Process takes a raft message and applies it to the server's raft state
// machine, respecting any timeout of the given context.
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
lg := s.getLogger()
lg := s.Logger()
if s.cluster.IsIDRemoved(types.ID(m.From)) {
lg.Warn(
"rejected Raft message from removed member",
@@ -933,7 +933,7 @@ type raftReadyHandler struct {
}
func (s *EtcdServer) run() {
lg := s.getLogger()
lg := s.Logger()
sn, err := s.r.raftStorage.Snapshot()
if err != nil {
@@ -1128,7 +1128,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
}
applySnapshotInProgress.Inc()
lg := s.getLogger()
lg := s.Logger()
lg.Info(
"applying snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
@@ -1261,7 +1261,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
}
firsti := apply.entries[0].Index
if firsti > ep.appliedi+1 {
lg := s.getLogger()
lg := s.Logger()
lg.Panic(
"unexpected committed entry index",
zap.Uint64("current-applied-index", ep.appliedi),
@@ -1286,7 +1286,7 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
return
}
lg := s.getLogger()
lg := s.Logger()
lg.Info(
"triggering snapshot",
zap.String("local-member-id", s.ID().String()),
@@ -1316,7 +1316,7 @@ func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) er
now := time.Now()
interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
lg := s.getLogger()
lg := s.Logger()
lg.Info(
"leadership transfer starting",
zap.String("local-member-id", s.ID().String()),
@@ -1346,7 +1346,7 @@ func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) er
// TransferLeadership transfers the leader to the chosen transferee.
func (s *EtcdServer) TransferLeadership() error {
lg := s.getLogger()
lg := s.Logger()
if !s.isLeader() {
lg.Info(
"skipped leadership transfer; local server is not leader",
@@ -1394,7 +1394,7 @@ func (s *EtcdServer) HardStop() {
// Stop terminates the Server and performs any necessary finalization.
// Do and Process cannot be called after Stop has been invoked.
func (s *EtcdServer) Stop() {
lg := s.getLogger()
lg := s.Logger()
if err := s.TransferLeadership(); err != nil {
lg.Warn("leadership transfer failed", zap.String("local-member-id", s.ID().String()), zap.Error(err))
}
@@ -1487,7 +1487,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*
}
func (s *EtcdServer) mayAddMember(memb membership.Member) error {
lg := s.getLogger()
lg := s.Logger()
if !s.Cfg.StrictReconfigCheck {
return nil
}
@@ -1613,7 +1613,7 @@ func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membershi
}
func (s *EtcdServer) mayPromoteMember(id types.ID) error {
lg := s.getLogger()
lg := s.Logger()
err := s.isLearnerReady(uint64(id))
if err != nil {
return err
@@ -1674,7 +1674,7 @@ func (s *EtcdServer) mayRemoveMember(id types.ID) error {
return nil
}
lg := s.getLogger()
lg := s.Logger()
isLearner := s.cluster.IsMemberExist(id) && s.cluster.Member(id).IsLearner
// no need to check quorum when removing non-voting member
if isLearner {
@@ -1798,7 +1798,7 @@ type confChangeResponse struct {
// then waits for it to be applied to the server. It
// will block until the change is performed or there is an error.
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) {
lg := s.getLogger()
lg := s.Logger()
cc.ID = s.reqIDGen.Next()
ch := s.w.Register(cc.ID)
@@ -1864,7 +1864,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
ClientUrls: s.attributes.ClientURLs,
},
}
lg := s.getLogger()
lg := s.Logger()
for {
select {
case <-s.stopping:
@@ -1918,7 +1918,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
// process publish requests through rafthttp
// TODO: Deprecate v2 store in 3.6
func (s *EtcdServer) publish(timeout time.Duration) {
lg := s.getLogger()
lg := s.Logger()
b, err := json.Marshal(s.attributes)
if err != nil {
lg.Panic("failed to marshal JSON", zap.Error(err))
@@ -1973,7 +1973,7 @@ func (s *EtcdServer) publish(timeout time.Duration) {
func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
atomic.AddInt64(&s.inflightSnapshots, 1)
lg := s.getLogger()
lg := s.Logger()
fields := []zap.Field{
zap.String("from", s.ID().String()),
zap.String("to", types.ID(merged.To).String()),
@@ -2039,7 +2039,7 @@ func (s *EtcdServer) apply(
s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
default:
lg := s.getLogger()
lg := s.Logger()
lg.Panic(
"unknown entry type; must be either EntryNormal or EntryConfChange",
zap.String("type", e.Type.String()),
@@ -2120,7 +2120,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
return
}
lg := s.getLogger()
lg := s.Logger()
lg.Warn(
"message exceeded backend quota; raising alarm",
zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
@@ -2148,7 +2148,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
return false, err
}
lg := s.getLogger()
lg := s.Logger()
*confState = *s.r.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
@@ -2222,7 +2222,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
s.KV().Commit()
s.GoAttach(func() {
lg := s.getLogger()
lg := s.Logger()
d, err := clone.SaveNoCopy()
// TODO: current store will never fail to do a snapshot
@@ -2328,7 +2328,7 @@ func (s *EtcdServer) monitorVersions() {
continue
}
v := decideClusterVersion(s.getLogger(), getVersions(s.getLogger(), s.cluster, s.id, s.peerRt))
v := decideClusterVersion(s.Logger(), getVersions(s.Logger(), s.cluster, s.id, s.peerRt))
if v != nil {
// only keep major.minor version for comparison
v = &semver.Version{
@@ -2356,7 +2356,7 @@ func (s *EtcdServer) monitorVersions() {
}
func (s *EtcdServer) updateClusterVersion(ver string) {
lg := s.getLogger()
lg := s.Logger()
if s.cluster.Version() == nil {
lg.Info(
@@ -2396,7 +2396,7 @@ func (s *EtcdServer) monitorDowngrade() {
if t == 0 {
return
}
lg := s.getLogger()
lg := s.Logger()
for {
select {
case <-time.After(t):
@@ -2415,7 +2415,7 @@ func (s *EtcdServer) monitorDowngrade() {
targetVersion := d.TargetVersion
v := semver.Must(semver.NewVersion(targetVersion))
if isMatchedVersions(s.getLogger(), v, getVersions(s.getLogger(), s.cluster, s.id, s.peerRt)) {
if isMatchedVersions(s.Logger(), v, getVersions(s.Logger(), s.cluster, s.id, s.peerRt)) {
lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if _, err := s.downgradeCancel(ctx); err != nil {
@@ -2492,7 +2492,7 @@ func (s *EtcdServer) GoAttach(f func()) {
defer s.wgMu.RUnlock()
select {
case <-s.stopping:
lg := s.getLogger()
lg := s.Logger()
lg.Warn("server has stopped; skipping GoAttach")
return
default:
@@ -2510,10 +2510,6 @@ func (s *EtcdServer) Alarms() []*pb.AlarmMember {
return s.alarmStore.Get(pb.AlarmType_NONE)
}
func (s *EtcdServer) Logger() *zap.Logger {
return s.lg
}
// IsLearner returns if the local member is raft learner
func (s *EtcdServer) IsLearner() bool {
return s.cluster.IsLocalMemberLearner()