From 44bd22307e45f06a16d7ea27c20ff2614483267f Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Sun, 14 Mar 2021 14:05:17 +0100 Subject: [PATCH] Merge get_logger() & Logger() method. --- server/etcdserver/apply.go | 14 +++--- server/etcdserver/apply_v2.go | 2 +- server/etcdserver/corrupt.go | 10 ++--- server/etcdserver/quota.go | 2 +- server/etcdserver/server.go | 70 ++++++++++++++--------------- server/etcdserver/snapshot_merge.go | 2 +- server/etcdserver/v3_server.go | 16 +++---- 7 files changed, 56 insertions(+), 60 deletions(-) diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index 30a12ba24..010450129 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -136,9 +136,9 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { defer func(start time.Time) { success := ar.err == nil || ar.err == mvcc.ErrCompacted applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds()) - warnOfExpensiveRequest(a.s.getLogger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) + warnOfExpensiveRequest(a.s.Logger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) if !success { - warnOfFailedRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) + warnOfFailedRequest(a.s.Logger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) } }(time.Now()) @@ -243,7 +243,7 @@ func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.Put // create put tracing if the trace in context is empty if trace.IsEmpty() { trace = traceutil.New("put", - a.s.getLogger(), + a.s.Logger(), traceutil.Field{Key: "key", Value: string(p.Key)}, traceutil.Field{Key: "req_size", Value: proto.Size(p)}, ) @@ -420,7 +420,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { trace := traceutil.Get(ctx) if trace.IsEmpty() { - trace = traceutil.New("transaction", a.s.getLogger()) + trace = traceutil.New("transaction", a.s.Logger()) ctx = context.WithValue(ctx, traceutil.TraceKey, trace) } isWrite := !isTxnReadonly(rt) @@ -606,7 +606,7 @@ func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt * reqs = rt.Failure } - lg := a.s.getLogger() + lg := a.s.Logger() for i, req := range reqs { respi := tresp.Responses[i].Response switch tv := req.Request.(type) { @@ -654,7 +654,7 @@ func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.Com resp := &pb.CompactionResponse{} resp.Header = &pb.ResponseHeader{} trace := traceutil.New("compact", - a.s.getLogger(), + a.s.Logger(), traceutil.Field{Key: "revision", Value: compaction.Revision}, ) @@ -698,7 +698,7 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) resp := &pb.AlarmResponse{} oldCount := len(a.s.alarmStore.Get(ar.Alarm)) - lg := a.s.getLogger() + lg := a.s.Logger() switch ar.Action { case pb.AlarmRequest_GET: resp.Alarms = a.s.alarmStore.Get(ar.Alarm) diff --git a/server/etcdserver/apply_v2.go b/server/etcdserver/apply_v2.go index 73c735d65..e322c29a6 100644 --- a/server/etcdserver/apply_v2.go +++ b/server/etcdserver/apply_v2.go @@ -120,7 +120,7 @@ func (s *EtcdServer) applyV2Request(r *RequestV2) (resp Response) { defer func(start time.Time) { success := resp.Err == nil applySec.WithLabelValues(v2Version, r.Method, strconv.FormatBool(success)).Observe(time.Since(start).Seconds()) - warnOfExpensiveRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil) + warnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil) }(time.Now()) switch r.Method { diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index b784fad36..2c2489567 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -41,7 +41,7 @@ func (s *EtcdServer) CheckInitialHashKV() error { return nil } - lg := s.getLogger() + lg := s.Logger() lg.Info( "starting initial corruption check", @@ -126,7 +126,7 @@ func (s *EtcdServer) monitorKVHash() { return } - lg := s.getLogger() + lg := s.Logger() lg.Info( "enabled corruption checking", zap.String("local-member-id", s.ID().String()), @@ -149,7 +149,7 @@ func (s *EtcdServer) monitorKVHash() { } func (s *EtcdServer) checkHashKV() error { - lg := s.getLogger() + lg := s.Logger() h, rev, crev, err := s.kv.HashByRev(0) if err != nil { @@ -268,7 +268,7 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp { peers = append(peers, peerInfo{id: m.ID, eps: m.PeerURLs}) } - lg := s.getLogger() + lg := s.Logger() var resps []*peerHashKVResp for _, p := range peers { @@ -345,7 +345,7 @@ type hashKVHandler struct { } func (s *EtcdServer) HashKVHandler() http.Handler { - return &hashKVHandler{lg: s.getLogger(), server: s} + return &hashKVHandler{lg: s.Logger(), server: s} } func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { diff --git a/server/etcdserver/quota.go b/server/etcdserver/quota.go index eb8967d62..33c06e619 100644 --- a/server/etcdserver/quota.go +++ b/server/etcdserver/quota.go @@ -72,7 +72,7 @@ var ( // NewBackendQuota creates a quota layer with the given storage limit. func NewBackendQuota(s *EtcdServer, name string) Quota { - lg := s.getLogger() + lg := s.Logger() quotaBackendBytes.Set(float64(s.Cfg.QuotaBackendBytes)) if s.Cfg.QuotaBackendBytes < 0 { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 17a473867..646a4c693 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -539,7 +539,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. srv.lessor = lease.NewLessor( - srv.getLogger(), + srv.Logger(), srv.be, lease.LessorConfig{ MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), @@ -559,7 +559,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { cfg.Logger.Warn("failed to create token provider", zap.Error(err)) return nil, err } - srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) kvindex := srv.consistIndex.ConsistentIndex() srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) @@ -577,7 +577,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } } - srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, srv.consistIndex, tp, int(cfg.BcryptCost)) + srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, srv.consistIndex, tp, int(cfg.BcryptCost)) newSrv := srv // since srv == nil in defer if srv is returned as nil defer func() { @@ -641,7 +641,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return srv, nil } -func (s *EtcdServer) getLogger() *zap.Logger { +func (s *EtcdServer) Logger() *zap.Logger { s.lgMu.RLock() l := s.lg s.lgMu.RUnlock() @@ -653,7 +653,7 @@ func tickToDur(ticks int, tickMs uint) string { } func (s *EtcdServer) adjustTicks() { - lg := s.getLogger() + lg := s.Logger() clusterN := len(s.cluster.Members()) // single-node fresh start, or single-node recovers from snapshot @@ -723,7 +723,7 @@ func (s *EtcdServer) Start() { s.GoAttach(func() { s.adjustTicks() }) s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) s.GoAttach(s.purgeFile) - s.GoAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) }) + s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) }) s.GoAttach(s.monitorVersions) s.GoAttach(s.linearizableReadLoop) s.GoAttach(s.monitorKVHash) @@ -734,7 +734,7 @@ func (s *EtcdServer) Start() { // modify a server's fields after it has been sent to Start. // This function is just used for testing. func (s *EtcdServer) start() { - lg := s.getLogger() + lg := s.Logger() if s.Cfg.SnapshotCount == 0 { lg.Info( @@ -786,7 +786,7 @@ func (s *EtcdServer) start() { } func (s *EtcdServer) purgeFile() { - lg := s.getLogger() + lg := s.Logger() var dberrc, serrc, werrc <-chan error var dbdonec, sdonec, wdonec <-chan struct{} if s.Cfg.MaxSnapFiles > 0 { @@ -853,7 +853,7 @@ type downgradeEnabledHandler struct { func (s *EtcdServer) DowngradeEnabledHandler() http.Handler { return &downgradeEnabledHandler{ - lg: s.getLogger(), + lg: s.Logger(), cluster: s.cluster, server: s, } @@ -890,7 +890,7 @@ func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Reque // Process takes a raft message and applies it to the server's raft state // machine, respecting any timeout of the given context. func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { - lg := s.getLogger() + lg := s.Logger() if s.cluster.IsIDRemoved(types.ID(m.From)) { lg.Warn( "rejected Raft message from removed member", @@ -933,7 +933,7 @@ type raftReadyHandler struct { } func (s *EtcdServer) run() { - lg := s.getLogger() + lg := s.Logger() sn, err := s.r.raftStorage.Snapshot() if err != nil { @@ -1128,7 +1128,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } applySnapshotInProgress.Inc() - lg := s.getLogger() + lg := s.Logger() lg.Info( "applying snapshot", zap.Uint64("current-snapshot-index", ep.snapi), @@ -1261,7 +1261,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { } firsti := apply.entries[0].Index if firsti > ep.appliedi+1 { - lg := s.getLogger() + lg := s.Logger() lg.Panic( "unexpected committed entry index", zap.Uint64("current-applied-index", ep.appliedi), @@ -1286,7 +1286,7 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { return } - lg := s.getLogger() + lg := s.Logger() lg.Info( "triggering snapshot", zap.String("local-member-id", s.ID().String()), @@ -1316,7 +1316,7 @@ func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) er now := time.Now() interval := time.Duration(s.Cfg.TickMs) * time.Millisecond - lg := s.getLogger() + lg := s.Logger() lg.Info( "leadership transfer starting", zap.String("local-member-id", s.ID().String()), @@ -1346,7 +1346,7 @@ func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) er // TransferLeadership transfers the leader to the chosen transferee. func (s *EtcdServer) TransferLeadership() error { - lg := s.getLogger() + lg := s.Logger() if !s.isLeader() { lg.Info( "skipped leadership transfer; local server is not leader", @@ -1394,7 +1394,7 @@ func (s *EtcdServer) HardStop() { // Stop terminates the Server and performs any necessary finalization. // Do and Process cannot be called after Stop has been invoked. func (s *EtcdServer) Stop() { - lg := s.getLogger() + lg := s.Logger() if err := s.TransferLeadership(); err != nil { lg.Warn("leadership transfer failed", zap.String("local-member-id", s.ID().String()), zap.Error(err)) } @@ -1487,7 +1487,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]* } func (s *EtcdServer) mayAddMember(memb membership.Member) error { - lg := s.getLogger() + lg := s.Logger() if !s.Cfg.StrictReconfigCheck { return nil } @@ -1613,7 +1613,7 @@ func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membershi } func (s *EtcdServer) mayPromoteMember(id types.ID) error { - lg := s.getLogger() + lg := s.Logger() err := s.isLearnerReady(uint64(id)) if err != nil { return err @@ -1674,7 +1674,7 @@ func (s *EtcdServer) mayRemoveMember(id types.ID) error { return nil } - lg := s.getLogger() + lg := s.Logger() isLearner := s.cluster.IsMemberExist(id) && s.cluster.Member(id).IsLearner // no need to check quorum when removing non-voting member if isLearner { @@ -1798,7 +1798,7 @@ type confChangeResponse struct { // then waits for it to be applied to the server. It // will block until the change is performed or there is an error. func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) { - lg := s.getLogger() + lg := s.Logger() cc.ID = s.reqIDGen.Next() ch := s.w.Register(cc.ID) @@ -1864,7 +1864,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) { ClientUrls: s.attributes.ClientURLs, }, } - lg := s.getLogger() + lg := s.Logger() for { select { case <-s.stopping: @@ -1918,7 +1918,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) { // process publish requests through rafthttp // TODO: Deprecate v2 store in 3.6 func (s *EtcdServer) publish(timeout time.Duration) { - lg := s.getLogger() + lg := s.Logger() b, err := json.Marshal(s.attributes) if err != nil { lg.Panic("failed to marshal JSON", zap.Error(err)) @@ -1973,7 +1973,7 @@ func (s *EtcdServer) publish(timeout time.Duration) { func (s *EtcdServer) sendMergedSnap(merged snap.Message) { atomic.AddInt64(&s.inflightSnapshots, 1) - lg := s.getLogger() + lg := s.Logger() fields := []zap.Field{ zap.String("from", s.ID().String()), zap.String("to", types.ID(merged.To).String()), @@ -2039,7 +2039,7 @@ func (s *EtcdServer) apply( s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err}) default: - lg := s.getLogger() + lg := s.Logger() lg.Panic( "unknown entry type; must be either EntryNormal or EntryConfChange", zap.String("type", e.Type.String()), @@ -2120,7 +2120,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { return } - lg := s.getLogger() + lg := s.Logger() lg.Warn( "message exceeded backend quota; raising alarm", zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes), @@ -2148,7 +2148,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con return false, err } - lg := s.getLogger() + lg := s.Logger() *confState = *s.r.ApplyConfChange(cc) switch cc.Type { case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode: @@ -2222,7 +2222,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { s.KV().Commit() s.GoAttach(func() { - lg := s.getLogger() + lg := s.Logger() d, err := clone.SaveNoCopy() // TODO: current store will never fail to do a snapshot @@ -2328,7 +2328,7 @@ func (s *EtcdServer) monitorVersions() { continue } - v := decideClusterVersion(s.getLogger(), getVersions(s.getLogger(), s.cluster, s.id, s.peerRt)) + v := decideClusterVersion(s.Logger(), getVersions(s.Logger(), s.cluster, s.id, s.peerRt)) if v != nil { // only keep major.minor version for comparison v = &semver.Version{ @@ -2356,7 +2356,7 @@ func (s *EtcdServer) monitorVersions() { } func (s *EtcdServer) updateClusterVersion(ver string) { - lg := s.getLogger() + lg := s.Logger() if s.cluster.Version() == nil { lg.Info( @@ -2396,7 +2396,7 @@ func (s *EtcdServer) monitorDowngrade() { if t == 0 { return } - lg := s.getLogger() + lg := s.Logger() for { select { case <-time.After(t): @@ -2415,7 +2415,7 @@ func (s *EtcdServer) monitorDowngrade() { targetVersion := d.TargetVersion v := semver.Must(semver.NewVersion(targetVersion)) - if isMatchedVersions(s.getLogger(), v, getVersions(s.getLogger(), s.cluster, s.id, s.peerRt)) { + if isMatchedVersions(s.Logger(), v, getVersions(s.Logger(), s.cluster, s.id, s.peerRt)) { lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion)) ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) if _, err := s.downgradeCancel(ctx); err != nil { @@ -2492,7 +2492,7 @@ func (s *EtcdServer) GoAttach(f func()) { defer s.wgMu.RUnlock() select { case <-s.stopping: - lg := s.getLogger() + lg := s.Logger() lg.Warn("server has stopped; skipping GoAttach") return default: @@ -2510,10 +2510,6 @@ func (s *EtcdServer) Alarms() []*pb.AlarmMember { return s.alarmStore.Get(pb.AlarmType_NONE) } -func (s *EtcdServer) Logger() *zap.Logger { - return s.lg -} - // IsLearner returns if the local member is raft learner func (s *EtcdServer) IsLearner() bool { return s.cluster.IsLocalMemberLearner() diff --git a/server/etcdserver/snapshot_merge.go b/server/etcdserver/snapshot_merge.go index c3bd0290b..72d10c179 100644 --- a/server/etcdserver/snapshot_merge.go +++ b/server/etcdserver/snapshot_merge.go @@ -29,7 +29,7 @@ import ( // a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message // as ReadCloser. func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message { - lg := s.getLogger() + lg := s.Logger() // get a snapshot of v2 store as []byte clone := s.v2store.Clone() d, err := clone.SaveNoCopy() diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 3fa64f741..45d79948c 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -93,7 +93,7 @@ type Authenticator interface { func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { trace := traceutil.New("range", - s.getLogger(), + s.Logger(), traceutil.Field{Key: "range_begin", Value: string(r.Key)}, traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)}, ) @@ -102,7 +102,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe var resp *pb.RangeResponse var err error defer func(start time.Time) { - warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, r, resp, err) + warnOfExpensiveReadOnlyRangeRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, r, resp, err) if resp != nil { trace.AddField( traceutil.Field{Key: "response_count", Value: len(resp.Kvs)}, @@ -151,7 +151,7 @@ func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { if isTxnReadonly(r) { trace := traceutil.New("transaction", - s.getLogger(), + s.Logger(), traceutil.Field{Key: "read_only", Value: true}, ) ctx = context.WithValue(ctx, traceutil.TraceKey, trace) @@ -169,7 +169,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse } defer func(start time.Time) { - warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, r, resp, err) + warnOfExpensiveReadOnlyTxnRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, r, resp, err) trace.LogIfLong(traceThreshold) }(time.Now()) @@ -426,7 +426,7 @@ func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest return nil, err } - lg := s.getLogger() + lg := s.Logger() var resp proto.Message for { @@ -723,7 +723,7 @@ func (s *EtcdServer) linearizableReadLoop() { // as a single loop is can unlock multiple reads, it is not very useful // to propagate the trace from Txn or Range. - trace := traceutil.New("linearizableReadLoop", s.getLogger()) + trace := traceutil.New("linearizableReadLoop", s.Logger()) nextnr := newNotifier() s.readMu.Lock() @@ -731,7 +731,7 @@ func (s *EtcdServer) linearizableReadLoop() { s.readNotifier = nextnr s.readMu.Unlock() - lg := s.getLogger() + lg := s.Logger() cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) if err := s.r.ReadIndex(cctx, ctxToSend); err != nil { cancel() @@ -895,7 +895,7 @@ func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.Downg func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) { // validate downgrade capability before starting downgrade v := r.Version - lg := s.getLogger() + lg := s.Logger() if resp, err := s.downgradeValidate(ctx, v); err != nil { lg.Warn("reject downgrade request", zap.Error(err)) return resp, err