From c94782cd55fb44df43574505db9ac1c1b7d49c00 Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Tue, 11 Feb 2020 08:54:14 -0800 Subject: [PATCH] etcdserver: remove capnslog (#11611) remove capnslog from etcdserver pkg, except etcdserver/api. --- etcdserver/apply.go | 42 +- etcdserver/apply_v2.go | 9 +- etcdserver/backend.go | 19 +- etcdserver/cluster_util.go | 172 +++---- etcdserver/corrupt.go | 231 +++------ etcdserver/metrics.go | 18 +- etcdserver/quota.go | 40 +- etcdserver/raft.go | 141 ++---- etcdserver/server.go | 927 ++++++++++++----------------------- etcdserver/snapshot_merge.go | 43 +- etcdserver/storage.go | 24 +- etcdserver/util.go | 28 +- etcdserver/v3_server.go | 46 +- 13 files changed, 556 insertions(+), 1184 deletions(-) diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 960124e31..25e50ebbb 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -553,31 +553,19 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat case *pb.RequestOp_RequestRange: resp, err := a.Range(context.TODO(), txn, tv.RequestRange) if err != nil { - if lg != nil { - lg.Panic("unexpected error during txn", zap.Error(err)) - } else { - plog.Panicf("unexpected error during txn: %v", err) - } + lg.Panic("unexpected error during txn", zap.Error(err)) } respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp case *pb.RequestOp_RequestPut: resp, _, err := a.Put(txn, tv.RequestPut) if err != nil { - if lg != nil { - lg.Panic("unexpected error during txn", zap.Error(err)) - } else { - plog.Panicf("unexpected error during txn: %v", err) - } + lg.Panic("unexpected error during txn", zap.Error(err)) } respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp case *pb.RequestOp_RequestDeleteRange: resp, err := a.DeleteRange(txn, tv.RequestDeleteRange) if err != nil { - if lg != nil { - lg.Panic("unexpected error during txn", zap.Error(err)) - } else { - plog.Panicf("unexpected error during txn: %v", err) - } + lg.Panic("unexpected error during txn", zap.Error(err)) } respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp case *pb.RequestOp_RequestTxn: @@ -655,22 +643,14 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) break } - if lg != nil { - lg.Warn("alarm raised", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String())) - } else { - plog.Warningf("alarm %v raised by peer %s", m.Alarm, types.ID(m.MemberID)) - } + lg.Warn("alarm raised", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String())) switch m.Alarm { case pb.AlarmType_CORRUPT: a.s.applyV3 = newApplierV3Corrupt(a) case pb.AlarmType_NOSPACE: a.s.applyV3 = newApplierV3Capped(a) default: - if lg != nil { - lg.Warn("unimplemented alarm activation", zap.String("alarm", fmt.Sprintf("%+v", m))) - } else { - plog.Errorf("unimplemented alarm activation (%+v)", m) - } + lg.Warn("unimplemented alarm activation", zap.String("alarm", fmt.Sprintf("%+v", m))) } case pb.AlarmRequest_DEACTIVATE: m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm) @@ -686,18 +666,10 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) switch m.Alarm { case pb.AlarmType_NOSPACE, pb.AlarmType_CORRUPT: // TODO: check kv hash before deactivating CORRUPT? - if lg != nil { - lg.Warn("alarm disarmed", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String())) - } else { - plog.Infof("alarm disarmed %+v", ar) - } + lg.Warn("alarm disarmed", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String())) a.s.applyV3 = a.s.newApplierV3() default: - if lg != nil { - lg.Warn("unimplemented alarm deactivation", zap.String("alarm", fmt.Sprintf("%+v", m))) - } else { - plog.Errorf("unimplemented alarm deactivation (%+v)", m) - } + lg.Warn("unimplemented alarm deactivation", zap.String("alarm", fmt.Sprintf("%+v", m))) } default: return nil, nil diff --git a/etcdserver/apply_v2.go b/etcdserver/apply_v2.go index 796e3806f..26956b5dc 100644 --- a/etcdserver/apply_v2.go +++ b/etcdserver/apply_v2.go @@ -36,6 +36,9 @@ type ApplierV2 interface { } func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 { + if lg == nil { + lg = zap.NewNop() + } return &applierV2store{lg: lg, store: s, cluster: c} } @@ -77,11 +80,7 @@ func (a *applierV2store) Put(r *RequestV2) Response { id := membership.MustParseMemberIDFromKey(path.Dir(r.Path)) var attr membership.Attributes if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { - if a.lg != nil { - a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) - } else { - plog.Panicf("unmarshal %s should never fail: %v", r.Val, err) - } + a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) } if a.cluster != nil { a.cluster.UpdateAttributes(id, attr) diff --git a/etcdserver/backend.go b/etcdserver/backend.go index 01ba19256..67fd21063 100644 --- a/etcdserver/backend.go +++ b/etcdserver/backend.go @@ -75,22 +75,15 @@ func openBackend(cfg ServerConfig) backend.Backend { select { case be := <-beOpened: - if cfg.Logger != nil { - cfg.Logger.Info("opened backend db", zap.String("path", fn), zap.Duration("took", time.Since(now))) - } + cfg.Logger.Info("opened backend db", zap.String("path", fn), zap.Duration("took", time.Since(now))) return be case <-time.After(10 * time.Second): - if cfg.Logger != nil { - cfg.Logger.Info( - "db file is flocked by another process, or taking too long", - zap.String("path", fn), - zap.Duration("took", time.Since(now)), - ) - } else { - plog.Warningf("another etcd process is using %q and holds the file lock, or loading backend file is taking >10 seconds", fn) - plog.Warningf("waiting for it to exit before starting...") - } + cfg.Logger.Info( + "db file is flocked by another process, or taking too long", + zap.String("path", fn), + zap.Duration("took", time.Since(now)), + ) } return <-beOpened diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index f92706cb7..7b7c43eae 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -63,6 +63,9 @@ func GetClusterFromRemotePeers(lg *zap.Logger, urls []string, rt http.RoundTripp // If logerr is true, it prints out more error messages. func getClusterFromRemotePeers(lg *zap.Logger, urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*membership.RaftCluster, error) { + if lg == nil { + lg = zap.NewNop() + } cc := &http.Client{ Transport: rt, Timeout: timeout, @@ -72,11 +75,7 @@ func getClusterFromRemotePeers(lg *zap.Logger, urls []string, timeout time.Durat resp, err := cc.Get(addr) if err != nil { if logerr { - if lg != nil { - lg.Warn("failed to get cluster response", zap.String("address", addr), zap.Error(err)) - } else { - plog.Warningf("could not get cluster response from %s: %v", u, err) - } + lg.Warn("failed to get cluster response", zap.String("address", addr), zap.Error(err)) } continue } @@ -84,38 +83,26 @@ func getClusterFromRemotePeers(lg *zap.Logger, urls []string, timeout time.Durat resp.Body.Close() if err != nil { if logerr { - if lg != nil { - lg.Warn("failed to read body of cluster response", zap.String("address", addr), zap.Error(err)) - } else { - plog.Warningf("could not read the body of cluster response: %v", err) - } + lg.Warn("failed to read body of cluster response", zap.String("address", addr), zap.Error(err)) } continue } var membs []*membership.Member if err = json.Unmarshal(b, &membs); err != nil { if logerr { - if lg != nil { - lg.Warn("failed to unmarshal cluster response", zap.String("address", addr), zap.Error(err)) - } else { - plog.Warningf("could not unmarshal cluster response: %v", err) - } + lg.Warn("failed to unmarshal cluster response", zap.String("address", addr), zap.Error(err)) } continue } id, err := types.IDFromString(resp.Header.Get("X-Etcd-Cluster-ID")) if err != nil { if logerr { - if lg != nil { - lg.Warn( - "failed to parse cluster ID", - zap.String("address", addr), - zap.String("header", resp.Header.Get("X-Etcd-Cluster-ID")), - zap.Error(err), - ) - } else { - plog.Warningf("could not parse the cluster ID from cluster res: %v", err) - } + lg.Warn( + "failed to parse cluster ID", + zap.String("address", addr), + zap.String("header", resp.Header.Get("X-Etcd-Cluster-ID")), + zap.Error(err), + ) } continue } @@ -164,11 +151,7 @@ func getVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt } ver, err := getVersion(lg, m, rt) if err != nil { - if lg != nil { - lg.Warn("failed to get version", zap.String("remote-member-id", m.ID.String()), zap.Error(err)) - } else { - plog.Warningf("cannot get the version of member %s (%v)", m.ID, err) - } + lg.Warn("failed to get version", zap.String("remote-member-id", m.ID.String()), zap.Error(err)) vers[m.ID.String()] = nil } else { vers[m.ID.String()] = ver @@ -190,30 +173,21 @@ func decideClusterVersion(lg *zap.Logger, vers map[string]*version.Versions) *se } v, err := semver.NewVersion(ver.Server) if err != nil { - if lg != nil { - lg.Warn( - "failed to parse server version of remote member", - zap.String("remote-member-id", mid), - zap.String("remote-member-version", ver.Server), - zap.Error(err), - ) - } else { - plog.Errorf("cannot understand the version of member %s (%v)", mid, err) - } + lg.Warn( + "failed to parse server version of remote member", + zap.String("remote-member-id", mid), + zap.String("remote-member-version", ver.Server), + zap.Error(err), + ) return nil } if lv.LessThan(*v) { - if lg != nil { - lg.Warn( - "leader found higher-versioned member", - zap.String("local-member-version", lv.String()), - zap.String("remote-member-id", mid), - zap.String("remote-member-version", ver.Server), - ) - } else { - plog.Warningf("the local etcd version %s is not up-to-date", lv.String()) - plog.Warningf("member %s has a higher version %s", mid, ver.Server) - } + lg.Warn( + "leader found higher-versioned member", + zap.String("local-member-version", lv.String()), + zap.String("remote-member-id", mid), + zap.String("remote-member-version", ver.Server), + ) } if cv == nil { cv = v @@ -253,42 +227,30 @@ func isCompatibleWithVers(lg *zap.Logger, vers map[string]*version.Versions, loc } clusterv, err := semver.NewVersion(v.Cluster) if err != nil { - if lg != nil { - lg.Warn( - "failed to parse cluster version of remote member", - zap.String("remote-member-id", id), - zap.String("remote-member-cluster-version", v.Cluster), - zap.Error(err), - ) - } else { - plog.Errorf("cannot understand the cluster version of member %s (%v)", id, err) - } + lg.Warn( + "failed to parse cluster version of remote member", + zap.String("remote-member-id", id), + zap.String("remote-member-cluster-version", v.Cluster), + zap.Error(err), + ) continue } if clusterv.LessThan(*minV) { - if lg != nil { - lg.Warn( - "cluster version of remote member is not compatible; too low", - zap.String("remote-member-id", id), - zap.String("remote-member-cluster-version", clusterv.String()), - zap.String("minimum-cluster-version-supported", minV.String()), - ) - } else { - plog.Warningf("the running cluster version(%v) is lower than the minimal cluster version(%v) supported", clusterv.String(), minV.String()) - } + lg.Warn( + "cluster version of remote member is not compatible; too low", + zap.String("remote-member-id", id), + zap.String("remote-member-cluster-version", clusterv.String()), + zap.String("minimum-cluster-version-supported", minV.String()), + ) return false } if maxV.LessThan(*clusterv) { - if lg != nil { - lg.Warn( - "cluster version of remote member is not compatible; too high", - zap.String("remote-member-id", id), - zap.String("remote-member-cluster-version", clusterv.String()), - zap.String("minimum-cluster-version-supported", minV.String()), - ) - } else { - plog.Warningf("the running cluster version(%v) is higher than the maximum cluster version(%v) supported", clusterv.String(), maxV.String()) - } + lg.Warn( + "cluster version of remote member is not compatible; too high", + zap.String("remote-member-id", id), + zap.String("remote-member-cluster-version", clusterv.String()), + zap.String("minimum-cluster-version-supported", minV.String()), + ) return false } ok = true @@ -311,46 +273,34 @@ func getVersion(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (*ve addr := u + "/version" resp, err = cc.Get(addr) if err != nil { - if lg != nil { - lg.Warn( - "failed to reach the peer URL", - zap.String("address", addr), - zap.String("remote-member-id", m.ID.String()), - zap.Error(err), - ) - } else { - plog.Warningf("failed to reach the peerURL(%s) of member %s (%v)", u, m.ID, err) - } + lg.Warn( + "failed to reach the peer URL", + zap.String("address", addr), + zap.String("remote-member-id", m.ID.String()), + zap.Error(err), + ) continue } var b []byte b, err = ioutil.ReadAll(resp.Body) resp.Body.Close() if err != nil { - if lg != nil { - lg.Warn( - "failed to read body of response", - zap.String("address", addr), - zap.String("remote-member-id", m.ID.String()), - zap.Error(err), - ) - } else { - plog.Warningf("failed to read out the response body from the peerURL(%s) of member %s (%v)", u, m.ID, err) - } + lg.Warn( + "failed to read body of response", + zap.String("address", addr), + zap.String("remote-member-id", m.ID.String()), + zap.Error(err), + ) continue } var vers version.Versions if err = json.Unmarshal(b, &vers); err != nil { - if lg != nil { - lg.Warn( - "failed to unmarshal response", - zap.String("address", addr), - zap.String("remote-member-id", m.ID.String()), - zap.Error(err), - ) - } else { - plog.Warningf("failed to unmarshal the response body got from the peerURL(%s) of member %s (%v)", u, m.ID, err) - } + lg.Warn( + "failed to unmarshal response", + zap.String("address", addr), + zap.String("remote-member-id", m.ID.String()), + zap.Error(err), + ) continue } return &vers, nil diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index 0c687bfcd..7950529f6 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -39,15 +39,11 @@ func (s *EtcdServer) CheckInitialHashKV() error { lg := s.getLogger() - if lg != nil { - lg.Info( - "starting initial corruption check", - zap.String("local-member-id", s.ID().String()), - zap.Duration("timeout", s.Cfg.ReqTimeout()), - ) - } else { - plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout()) - } + lg.Info( + "starting initial corruption check", + zap.String("local-member-id", s.ID().String()), + zap.Duration("timeout", s.Cfg.ReqTimeout()), + ) h, rev, crev, err := s.kv.HashByRev(0) if err != nil { @@ -72,18 +68,10 @@ func (s *EtcdServer) CheckInitialHashKV() error { if h != p.resp.Hash { if crev == p.resp.CompactRevision { - if lg != nil { - lg.Warn("found different hash values from remote peer", fields...) - } else { - plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev) - } + lg.Warn("found different hash values from remote peer", fields...) mismatch++ } else { - if lg != nil { - lg.Warn("found different compact revision values from remote peer", fields...) - } else { - plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev) - } + lg.Warn("found different compact revision values from remote peer", fields...) } } @@ -93,35 +81,27 @@ func (s *EtcdServer) CheckInitialHashKV() error { if p.err != nil { switch p.err { case rpctypes.ErrFutureRev: - if lg != nil { - lg.Warn( - "cannot fetch hash from slow remote peer", - zap.String("local-member-id", s.ID().String()), - zap.Int64("local-member-revision", rev), - zap.Int64("local-member-compact-revision", crev), - zap.Uint32("local-member-hash", h), - zap.String("remote-peer-id", p.id.String()), - zap.Strings("remote-peer-endpoints", p.eps), - zap.Error(err), - ) - } else { - plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error()) - } + lg.Warn( + "cannot fetch hash from slow remote peer", + zap.String("local-member-id", s.ID().String()), + zap.Int64("local-member-revision", rev), + zap.Int64("local-member-compact-revision", crev), + zap.Uint32("local-member-hash", h), + zap.String("remote-peer-id", p.id.String()), + zap.Strings("remote-peer-endpoints", p.eps), + zap.Error(err), + ) case rpctypes.ErrCompacted: - if lg != nil { - lg.Warn( - "cannot fetch hash from remote peer; local member is behind", - zap.String("local-member-id", s.ID().String()), - zap.Int64("local-member-revision", rev), - zap.Int64("local-member-compact-revision", crev), - zap.Uint32("local-member-hash", h), - zap.String("remote-peer-id", p.id.String()), - zap.Strings("remote-peer-endpoints", p.eps), - zap.Error(err), - ) - } else { - plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error()) - } + lg.Warn( + "cannot fetch hash from remote peer; local member is behind", + zap.String("local-member-id", s.ID().String()), + zap.Int64("local-member-revision", rev), + zap.Int64("local-member-compact-revision", crev), + zap.Uint32("local-member-hash", h), + zap.String("remote-peer-id", p.id.String()), + zap.Strings("remote-peer-endpoints", p.eps), + zap.Error(err), + ) } } } @@ -129,14 +109,10 @@ func (s *EtcdServer) CheckInitialHashKV() error { return fmt.Errorf("%s found data inconsistency with peers", s.ID()) } - if lg != nil { - lg.Info( - "initial corruption checking passed; no corruption", - zap.String("local-member-id", s.ID().String()), - ) - } else { - plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID()) - } + lg.Info( + "initial corruption checking passed; no corruption", + zap.String("local-member-id", s.ID().String()), + ) return nil } @@ -147,15 +123,11 @@ func (s *EtcdServer) monitorKVHash() { } lg := s.getLogger() - if lg != nil { - lg.Info( - "enabled corruption checking", - zap.String("local-member-id", s.ID().String()), - zap.Duration("interval", t), - ) - } else { - plog.Infof("enabled corruption checking with %s interval", t) - } + lg.Info( + "enabled corruption checking", + zap.String("local-member-id", s.ID().String()), + zap.Duration("interval", t), + ) for { select { @@ -167,11 +139,7 @@ func (s *EtcdServer) monitorKVHash() { continue } if err := s.checkHashKV(); err != nil { - if lg != nil { - lg.Warn("failed to check hash KV", zap.Error(err)) - } else { - plog.Debugf("check hash kv failed %v", err) - } + lg.Warn("failed to check hash KV", zap.Error(err)) } } } @@ -214,19 +182,15 @@ func (s *EtcdServer) checkHashKV() error { } if h2 != h && rev2 == rev && crev == crev2 { - if lg != nil { - lg.Warn( - "found hash mismatch", - zap.Int64("revision-1", rev), - zap.Int64("compact-revision-1", crev), - zap.Uint32("hash-1", h), - zap.Int64("revision-2", rev2), - zap.Int64("compact-revision-2", crev2), - zap.Uint32("hash-2", h2), - ) - } else { - plog.Warningf("mismatched hashes %d and %d for revision %d", h, h2, rev) - } + lg.Warn( + "found hash mismatch", + zap.Int64("revision-1", rev), + zap.Int64("compact-revision-1", crev), + zap.Uint32("hash-1", h), + zap.Int64("revision-2", rev2), + zap.Int64("compact-revision-2", crev2), + zap.Uint32("hash-2", h2), + ) mismatch(uint64(s.ID())) } @@ -238,63 +202,36 @@ func (s *EtcdServer) checkHashKV() error { // leader expects follower's latest revision less than or equal to leader's if p.resp.Header.Revision > rev2 { - if lg != nil { - lg.Warn( - "revision from follower must be less than or equal to leader's", - zap.Int64("leader-revision", rev2), - zap.Int64("follower-revision", p.resp.Header.Revision), - zap.String("follower-peer-id", types.ID(id).String()), - ) - } else { - plog.Warningf( - "revision %d from member %v, expected at most %d", - p.resp.Header.Revision, - types.ID(id), - rev2) - } + lg.Warn( + "revision from follower must be less than or equal to leader's", + zap.Int64("leader-revision", rev2), + zap.Int64("follower-revision", p.resp.Header.Revision), + zap.String("follower-peer-id", types.ID(id).String()), + ) mismatch(id) } // leader expects follower's latest compact revision less than or equal to leader's if p.resp.CompactRevision > crev2 { - if lg != nil { - lg.Warn( - "compact revision from follower must be less than or equal to leader's", - zap.Int64("leader-compact-revision", crev2), - zap.Int64("follower-compact-revision", p.resp.CompactRevision), - zap.String("follower-peer-id", types.ID(id).String()), - ) - } else { - plog.Warningf( - "compact revision %d from member %v, expected at most %d", - p.resp.CompactRevision, - types.ID(id), - crev2, - ) - } + lg.Warn( + "compact revision from follower must be less than or equal to leader's", + zap.Int64("leader-compact-revision", crev2), + zap.Int64("follower-compact-revision", p.resp.CompactRevision), + zap.String("follower-peer-id", types.ID(id).String()), + ) mismatch(id) } // follower's compact revision is leader's old one, then hashes must match if p.resp.CompactRevision == crev && p.resp.Hash != h { - if lg != nil { - lg.Warn( - "same compact revision then hashes must match", - zap.Int64("leader-compact-revision", crev2), - zap.Uint32("leader-hash", h), - zap.Int64("follower-compact-revision", p.resp.CompactRevision), - zap.Uint32("follower-hash", p.resp.Hash), - zap.String("follower-peer-id", types.ID(id).String()), - ) - } else { - plog.Warningf( - "hash %d at revision %d from member %v, expected hash %d", - p.resp.Hash, - rev, - types.ID(id), - h, - ) - } + lg.Warn( + "same compact revision then hashes must match", + zap.Int64("leader-compact-revision", crev2), + zap.Uint32("leader-hash", h), + zap.Int64("follower-compact-revision", p.resp.CompactRevision), + zap.Uint32("follower-hash", p.resp.Hash), + zap.String("follower-peer-id", types.ID(id).String()), + ) mismatch(id) } } @@ -332,17 +269,13 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) { Endpoints: p.eps, }) if cerr != nil { - if lg != nil { - lg.Warn( - "failed to create client to peer URL", - zap.String("local-member-id", s.ID().String()), - zap.String("remote-peer-id", p.id.String()), - zap.Strings("remote-peer-endpoints", p.eps), - zap.Error(cerr), - ) - } else { - plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), p.eps, cerr.Error()) - } + lg.Warn( + "failed to create client to peer URL", + zap.String("local-member-id", s.ID().String()), + zap.String("remote-peer-id", p.id.String()), + zap.Strings("remote-peer-endpoints", p.eps), + zap.Error(cerr), + ) continue } @@ -356,17 +289,13 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) { resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: resp, err: nil}) break } - if lg != nil { - lg.Warn( - "failed hash kv request", - zap.String("local-member-id", s.ID().String()), - zap.Int64("requested-revision", rev), - zap.String("remote-peer-endpoint", c), - zap.Error(cerr), - ) - } else { - plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev) - } + lg.Warn( + "failed hash kv request", + zap.String("local-member-id", s.ID().String()), + zap.Int64("requested-revision", rev), + zap.String("remote-peer-endpoint", c), + zap.Error(cerr), + ) } cli.Close() diff --git a/etcdserver/metrics.go b/etcdserver/metrics.go index e0c0cde85..cde5e7ebc 100644 --- a/etcdserver/metrics.go +++ b/etcdserver/metrics.go @@ -189,28 +189,16 @@ func monitorFileDescriptor(lg *zap.Logger, done <-chan struct{}) { for { used, err := runtime.FDUsage() if err != nil { - if lg != nil { - lg.Warn("failed to get file descriptor usage", zap.Error(err)) - } else { - plog.Errorf("cannot monitor file descriptor usage (%v)", err) - } + lg.Warn("failed to get file descriptor usage", zap.Error(err)) return } limit, err := runtime.FDLimit() if err != nil { - if lg != nil { - lg.Warn("failed to get file descriptor limit", zap.Error(err)) - } else { - plog.Errorf("cannot monitor file descriptor usage (%v)", err) - } + lg.Warn("failed to get file descriptor limit", zap.Error(err)) return } if used >= limit/5*4 { - if lg != nil { - lg.Warn("80% of file descriptors are used", zap.Uint64("used", used), zap.Uint64("limit", limit)) - } else { - plog.Warningf("80%% of the file descriptor limit is used [used = %d, limit = %d]", used, limit) - } + lg.Warn("80% of file descriptors are used", zap.Uint64("used", used), zap.Uint64("limit", limit)) } select { case <-ticker.C: diff --git a/etcdserver/quota.go b/etcdserver/quota.go index 6d70430e7..a3536a138 100644 --- a/etcdserver/quota.go +++ b/etcdserver/quota.go @@ -78,15 +78,11 @@ func NewBackendQuota(s *EtcdServer, name string) Quota { if s.Cfg.QuotaBackendBytes < 0 { // disable quotas if negative quotaLogOnce.Do(func() { - if lg != nil { - lg.Info( - "disabled backend quota", - zap.String("quota-name", name), - zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes), - ) - } else { - plog.Warningf("disabling backend quota") - } + lg.Info( + "disabled backend quota", + zap.String("quota-name", name), + zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes), + ) }) return &passthroughQuota{} } @@ -109,27 +105,21 @@ func NewBackendQuota(s *EtcdServer, name string) Quota { quotaLogOnce.Do(func() { if s.Cfg.QuotaBackendBytes > MaxQuotaBytes { - if lg != nil { - lg.Warn( - "quota exceeds the maximum value", - zap.String("quota-name", name), - zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes), - zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))), - zap.Int64("quota-maximum-size-bytes", MaxQuotaBytes), - zap.String("quota-maximum-size", maxQuotaSize), - ) - } else { - plog.Warningf("backend quota %v exceeds maximum recommended quota %v", s.Cfg.QuotaBackendBytes, MaxQuotaBytes) - } - } - if lg != nil { - lg.Info( - "enabled backend quota", + lg.Warn( + "quota exceeds the maximum value", zap.String("quota-name", name), zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes), zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))), + zap.Int64("quota-maximum-size-bytes", MaxQuotaBytes), + zap.String("quota-maximum-size", maxQuotaSize), ) } + lg.Info( + "enabled backend quota", + zap.String("quota-name", name), + zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes), + zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))), + ) }) return &backendQuota{s, s.Cfg.QuotaBackendBytes} } diff --git a/etcdserver/raft.go b/etcdserver/raft.go index c0fe97905..754df17a2 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -198,11 +198,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { select { case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]: case <-time.After(internalTimeout): - if r.lg != nil { - r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout)) - } else { - plog.Warningf("timed out sending read state") - } + r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout)) case <-r.stopped: return } @@ -233,11 +229,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { // gofail: var raftBeforeSave struct{} if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { - if r.lg != nil { - r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) - } else { - plog.Fatalf("raft save state and entries error: %v", err) - } + r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) } if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) @@ -247,22 +239,14 @@ func (r *raftNode) start(rh *raftReadyHandler) { if !raft.IsEmptySnap(rd.Snapshot) { // gofail: var raftBeforeSaveSnap struct{} if err := r.storage.SaveSnap(rd.Snapshot); err != nil { - if r.lg != nil { - r.lg.Fatal("failed to save Raft snapshot", zap.Error(err)) - } else { - plog.Fatalf("raft save snapshot error: %v", err) - } + r.lg.Fatal("failed to save Raft snapshot", zap.Error(err)) } // etcdserver now claim the snapshot has been persisted onto the disk notifyc <- struct{}{} // gofail: var raftAfterSaveSnap struct{} r.raftStorage.ApplySnapshot(rd.Snapshot) - if r.lg != nil { - r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index)) - } else { - plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) - } + r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index)) // gofail: var raftAfterApplySnap struct{} } @@ -359,18 +343,13 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message { ok, exceed := r.td.Observe(ms[i].To) if !ok { // TODO: limit request rate. - if r.lg != nil { - r.lg.Warn( - "leader failed to send out heartbeat on time; took too long, leader is overloaded likely from slow disk", - zap.String("to", fmt.Sprintf("%x", ms[i].To)), - zap.Duration("heartbeat-interval", r.heartbeat), - zap.Duration("expected-duration", 2*r.heartbeat), - zap.Duration("exceeded-duration", exceed), - ) - } else { - plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v, to %x)", r.heartbeat, exceed, ms[i].To) - plog.Warningf("server is likely overloaded") - } + r.lg.Warn( + "leader failed to send out heartbeat on time; took too long, leader is overloaded likely from slow disk", + zap.String("to", fmt.Sprintf("%x", ms[i].To)), + zap.Duration("heartbeat-interval", r.heartbeat), + zap.Duration("expected-duration", 2*r.heartbeat), + zap.Duration("exceeded-duration", exceed), + ) heartbeatSendFailures.Inc() } } @@ -392,11 +371,7 @@ func (r *raftNode) onStop() { r.ticker.Stop() r.transport.Stop() if err := r.storage.Close(); err != nil { - if r.lg != nil { - r.lg.Panic("failed to close Raft storage", zap.Error(err)) - } else { - plog.Panicf("raft close storage error: %v", err) - } + r.lg.Panic("failed to close Raft storage", zap.Error(err)) } close(r.done) } @@ -432,35 +407,23 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id }, ) if w, err = wal.Create(cfg.Logger, cfg.WALDir(), metadata); err != nil { - if cfg.Logger != nil { - cfg.Logger.Panic("failed to create WAL", zap.Error(err)) - } else { - plog.Panicf("create wal error: %v", err) - } + cfg.Logger.Panic("failed to create WAL", zap.Error(err)) } peers := make([]raft.Peer, len(ids)) for i, id := range ids { var ctx []byte ctx, err = json.Marshal((*cl).Member(id)) if err != nil { - if cfg.Logger != nil { - cfg.Logger.Panic("failed to marshal member", zap.Error(err)) - } else { - plog.Panicf("marshal member should never fail: %v", err) - } + cfg.Logger.Panic("failed to marshal member", zap.Error(err)) } peers[i] = raft.Peer{ID: uint64(id), Context: ctx} } id = member.ID - if cfg.Logger != nil { - cfg.Logger.Info( - "starting local member", - zap.String("local-member-id", id.String()), - zap.String("cluster-id", cl.ID().String()), - ) - } else { - plog.Infof("starting member %s in cluster %s", id, cl.ID()) - } + cfg.Logger.Info( + "starting local member", + zap.String("local-member-id", id.String()), + zap.String("cluster-id", cl.ID().String()), + ) s = raft.NewMemoryStorage() c := &raft.Config{ ID: uint64(id), @@ -502,16 +465,12 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member } w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap) - if cfg.Logger != nil { - cfg.Logger.Info( - "restarting local member", - zap.String("cluster-id", cid.String()), - zap.String("local-member-id", id.String()), - zap.Uint64("commit-index", st.Commit), - ) - } else { - plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit) - } + cfg.Logger.Info( + "restarting local member", + zap.String("cluster-id", cid.String()), + zap.String("local-member-id", id.String()), + zap.Uint64("commit-index", st.Commit), + ) cl := membership.NewCluster(cfg.Logger, "") cl.SetID(id, cid) s := raft.NewMemoryStorage() @@ -560,16 +519,12 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types // discard the previously uncommitted entries for i, ent := range ents { if ent.Index > st.Commit { - if cfg.Logger != nil { - cfg.Logger.Info( - "discarding uncommitted WAL entries", - zap.Uint64("entry-index", ent.Index), - zap.Uint64("commit-index-from-wal", st.Commit), - zap.Int("number-of-discarded-entries", len(ents)-i), - ) - } else { - plog.Infof("discarding %d uncommitted WAL entries ", len(ents)-i) - } + cfg.Logger.Info( + "discarding uncommitted WAL entries", + zap.Uint64("entry-index", ent.Index), + zap.Uint64("commit-index-from-wal", st.Commit), + zap.Int("number-of-discarded-entries", len(ents)-i), + ) ents = ents[:i] break } @@ -588,26 +543,18 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types // force commit newly appended entries err := w.Save(raftpb.HardState{}, toAppEnts) if err != nil { - if cfg.Logger != nil { - cfg.Logger.Fatal("failed to save hard state and entries", zap.Error(err)) - } else { - plog.Fatalf("%v", err) - } + cfg.Logger.Fatal("failed to save hard state and entries", zap.Error(err)) } if len(ents) != 0 { st.Commit = ents[len(ents)-1].Index } - if cfg.Logger != nil { - cfg.Logger.Info( - "forcing restart member", - zap.String("cluster-id", cid.String()), - zap.String("local-member-id", id.String()), - zap.Uint64("commit-index", st.Commit), - ) - } else { - plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit) - } + cfg.Logger.Info( + "forcing restart member", + zap.String("cluster-id", cid.String()), + zap.String("local-member-id", id.String()), + zap.Uint64("commit-index", st.Commit), + ) cl := membership.NewCluster(cfg.Logger, "") cl.SetID(id, cid) @@ -670,11 +617,7 @@ func getIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 case raftpb.ConfChangeUpdateNode: // do nothing default: - if lg != nil { - lg.Panic("unknown ConfChange Type", zap.String("type", cc.Type.String())) - } else { - plog.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!") - } + lg.Panic("unknown ConfChange Type", zap.String("type", cc.Type.String())) } } sids := make(types.Uint64Slice, 0, len(ids)) @@ -710,11 +653,7 @@ func createConfigChangeEnts(lg *zap.Logger, ids []uint64, self uint64, term, ind } ctx, err := json.Marshal(m) if err != nil { - if lg != nil { - lg.Panic("failed to marshal member", zap.Error(err)) - } else { - plog.Panicf("marshal member should never fail: %v", err) - } + lg.Panic("failed to marshal member", zap.Error(err)) } cc := &raftpb.ConfChange{ Type: raftpb.ConfChangeAddNode, diff --git a/etcdserver/server.go b/etcdserver/server.go index f37a401b7..846a76694 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -60,7 +60,6 @@ import ( "go.etcd.io/etcd/wal" "github.com/coreos/go-semver/semver" - "github.com/coreos/pkg/capnslog" humanize "github.com/dustin/go-humanize" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -104,8 +103,6 @@ const ( ) var ( - plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "etcdserver") - storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes")) ) @@ -297,17 +294,13 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { ) if cfg.MaxRequestBytes > recommendedMaxRequestBytes { - if cfg.Logger != nil { - cfg.Logger.Warn( - "exceeded recommended request limit", - zap.Uint("max-request-bytes", cfg.MaxRequestBytes), - zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))), - zap.Int("recommended-request-bytes", recommendedMaxRequestBytes), - zap.String("recommended-request-size", humanize.Bytes(uint64(recommendedMaxRequestBytes))), - ) - } else { - plog.Warningf("MaxRequestBytes %v exceeds maximum recommended size %v", cfg.MaxRequestBytes, recommendedMaxRequestBytes) - } + cfg.Logger.Warn( + "exceeded recommended request limit", + zap.Uint("max-request-bytes", cfg.MaxRequestBytes), + zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))), + zap.Int("recommended-request-bytes", recommendedMaxRequestBytes), + zap.String("recommended-request-size", humanize.Bytes(uint64(recommendedMaxRequestBytes))), + ) } if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil { @@ -317,15 +310,11 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { haveWAL := wal.Exist(cfg.WALDir()) if err = fileutil.TouchDirAll(cfg.SnapDir()); err != nil { - if cfg.Logger != nil { - cfg.Logger.Fatal( - "failed to create snapshot directory", - zap.String("path", cfg.SnapDir()), - zap.Error(err), - ) - } else { - plog.Fatalf("create snapshot directory error: %v", err) - } + cfg.Logger.Fatal( + "failed to create snapshot directory", + zap.String("path", cfg.SnapDir()), + zap.Error(err), + ) } ss := snap.New(cfg.Logger, cfg.SnapDir()) @@ -420,14 +409,10 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { } if cfg.ShouldDiscover() { - if cfg.Logger != nil { - cfg.Logger.Warn( - "discovery token is ignored since cluster already initialized; valid logs are found", - zap.String("wal-dir", cfg.WALDir()), - ) - } else { - plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir()) - } + cfg.Logger.Warn( + "discovery token is ignored since cluster already initialized; valid logs are found", + zap.String("wal-dir", cfg.WALDir()), + ) } snapshot, err = ss.Load() if err != nil && err != snap.ErrNoSnapshot { @@ -435,40 +420,26 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { } if snapshot != nil { if err = st.Recovery(snapshot.Data); err != nil { - if cfg.Logger != nil { - cfg.Logger.Panic("failed to recover from snapshot") - } else { - plog.Panicf("recovered store from snapshot error: %v", err) - } + cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) } - if cfg.Logger != nil { - cfg.Logger.Info( - "recovered v2 store from snapshot", - zap.Uint64("snapshot-index", snapshot.Metadata.Index), - zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), - ) - } else { - plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index) - } + cfg.Logger.Info( + "recovered v2 store from snapshot", + zap.Uint64("snapshot-index", snapshot.Metadata.Index), + zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), + ) if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil { - if cfg.Logger != nil { - cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) - } else { - plog.Panicf("recovering backend from snapshot error: %v", err) - } - } - if cfg.Logger != nil { - s1, s2 := be.Size(), be.SizeInUse() - cfg.Logger.Info( - "recovered v3 backend from snapshot", - zap.Int64("backend-size-bytes", s1), - zap.String("backend-size", humanize.Bytes(uint64(s1))), - zap.Int64("backend-size-in-use-bytes", s2), - zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), - ) + cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) } + s1, s2 := be.Size(), be.SizeInUse() + cfg.Logger.Info( + "recovered v3 backend from snapshot", + zap.Int64("backend-size-bytes", s1), + zap.String("backend-size", humanize.Bytes(uint64(s1))), + zap.Int64("backend-size-in-use-bytes", s2), + zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), + ) } if !cfg.ForceNewCluster { @@ -552,14 +523,10 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { if kvindex != 0 { return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", bepath, kvindex, snapshot.Metadata.Index) } - if cfg.Logger != nil { - cfg.Logger.Warn( - "consistent index was never saved", - zap.Uint64("snapshot-index", snapshot.Metadata.Index), - ) - } else { - plog.Warningf("consistent index never saved (snapshot index=%d)", snapshot.Metadata.Index) - } + cfg.Logger.Warn( + "consistent index was never saved", + zap.Uint64("snapshot-index", snapshot.Metadata.Index), + ) } } newSrv := srv // since srv == nil in defer if srv is returned as nil @@ -578,11 +545,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { }, ) if err != nil { - if cfg.Logger != nil { - cfg.Logger.Warn("failed to create token provider", zap.Error(err)) - } else { - plog.Errorf("failed to create token provider: %s", err) - } + cfg.Logger.Warn("failed to create token provider", zap.Error(err)) return nil, err } srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost)) @@ -658,31 +621,23 @@ func (s *EtcdServer) adjustTicks() { // single-node fresh start, or single-node recovers from snapshot if clusterN == 1 { ticks := s.Cfg.ElectionTicks - 1 - if lg != nil { - lg.Info( - "started as single-node; fast-forwarding election ticks", - zap.String("local-member-id", s.ID().String()), - zap.Int("forward-ticks", ticks), - zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)), - zap.Int("election-ticks", s.Cfg.ElectionTicks), - zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)), - ) - } else { - plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", s.ID(), ticks, s.Cfg.ElectionTicks) - } + lg.Info( + "started as single-node; fast-forwarding election ticks", + zap.String("local-member-id", s.ID().String()), + zap.Int("forward-ticks", ticks), + zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)), + zap.Int("election-ticks", s.Cfg.ElectionTicks), + zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)), + ) s.r.advanceTicks(ticks) return } if !s.Cfg.InitialElectionTickAdvance { - if lg != nil { - lg.Info("skipping initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks)) - } + lg.Info("skipping initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks)) return } - if lg != nil { - lg.Info("starting initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks)) - } + lg.Info("starting initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks)) // retry up to "rafthttp.ConnReadTimeout", which is 5-sec // until peer connection reports; otherwise: @@ -705,19 +660,15 @@ func (s *EtcdServer) adjustTicks() { // adjust ticks, in case slow leader message receive ticks := s.Cfg.ElectionTicks - 2 - if lg != nil { - lg.Info( - "initialized peer connections; fast-forwarding election ticks", - zap.String("local-member-id", s.ID().String()), - zap.Int("forward-ticks", ticks), - zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)), - zap.Int("election-ticks", s.Cfg.ElectionTicks), - zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)), - zap.Int("active-remote-members", peerN), - ) - } else { - plog.Infof("%s initialized peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", s.ID(), ticks, s.Cfg.ElectionTicks, peerN) - } + lg.Info( + "initialized peer connections; fast-forwarding election ticks", + zap.String("local-member-id", s.ID().String()), + zap.Int("forward-ticks", ticks), + zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)), + zap.Int("election-ticks", s.Cfg.ElectionTicks), + zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)), + zap.Int("active-remote-members", peerN), + ) s.r.advanceTicks(ticks) return @@ -747,25 +698,19 @@ func (s *EtcdServer) start() { lg := s.getLogger() if s.Cfg.SnapshotCount == 0 { - if lg != nil { - lg.Info( - "updating snapshot-count to default", - zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount), - zap.Uint64("updated-snapshot-count", DefaultSnapshotCount), - ) - } else { - plog.Infof("set snapshot count to default %d", DefaultSnapshotCount) - } + lg.Info( + "updating snapshot-count to default", + zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount), + zap.Uint64("updated-snapshot-count", DefaultSnapshotCount), + ) s.Cfg.SnapshotCount = DefaultSnapshotCount } if s.Cfg.SnapshotCatchUpEntries == 0 { - if lg != nil { - lg.Info( - "updating snapshot catch-up entries to default", - zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries), - zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries), - ) - } + lg.Info( + "updating snapshot catch-up entries to default", + zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries), + zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries), + ) s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries } @@ -779,29 +724,21 @@ func (s *EtcdServer) start() { s.readNotifier = newNotifier() s.leaderChanged = make(chan struct{}) if s.ClusterVersion() != nil { - if lg != nil { - lg.Info( - "starting etcd server", - zap.String("local-member-id", s.ID().String()), - zap.String("local-server-version", version.Version), - zap.String("cluster-id", s.Cluster().ID().String()), - zap.String("cluster-version", version.Cluster(s.ClusterVersion().String())), - ) - } else { - plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String())) - } + lg.Info( + "starting etcd server", + zap.String("local-member-id", s.ID().String()), + zap.String("local-server-version", version.Version), + zap.String("cluster-id", s.Cluster().ID().String()), + zap.String("cluster-version", version.Cluster(s.ClusterVersion().String())), + ) membership.ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(s.ClusterVersion().String())}).Set(1) } else { - if lg != nil { - lg.Info( - "starting etcd server", - zap.String("local-member-id", s.ID().String()), - zap.String("local-server-version", version.Version), - zap.String("cluster-version", "to_be_decided"), - ) - } else { - plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version) - } + lg.Info( + "starting etcd server", + zap.String("local-member-id", s.ID().String()), + zap.String("local-server-version", version.Version), + zap.String("cluster-version", "to_be_decided"), + ) } // TODO: if this is an empty log, writes all peer infos @@ -810,36 +747,24 @@ func (s *EtcdServer) start() { } func (s *EtcdServer) purgeFile() { + lg := s.getLogger() var dberrc, serrc, werrc <-chan error var dbdonec, sdonec, wdonec <-chan struct{} if s.Cfg.MaxSnapFiles > 0 { - dbdonec, dberrc = fileutil.PurgeFileWithDoneNotify(s.getLogger(), s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping) - sdonec, serrc = fileutil.PurgeFileWithDoneNotify(s.getLogger(), s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping) + dbdonec, dberrc = fileutil.PurgeFileWithDoneNotify(lg, s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping) + sdonec, serrc = fileutil.PurgeFileWithDoneNotify(lg, s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping) } if s.Cfg.MaxWALFiles > 0 { - wdonec, werrc = fileutil.PurgeFileWithDoneNotify(s.getLogger(), s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.stopping) + wdonec, werrc = fileutil.PurgeFileWithDoneNotify(lg, s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.stopping) } - lg := s.getLogger() select { case e := <-dberrc: - if lg != nil { - lg.Fatal("failed to purge snap db file", zap.Error(e)) - } else { - plog.Fatalf("failed to purge snap db file %v", e) - } + lg.Fatal("failed to purge snap db file", zap.Error(e)) case e := <-serrc: - if lg != nil { - lg.Fatal("failed to purge snap file", zap.Error(e)) - } else { - plog.Fatalf("failed to purge snap file %v", e) - } + lg.Fatal("failed to purge snap file", zap.Error(e)) case e := <-werrc: - if lg != nil { - lg.Fatal("failed to purge wal file", zap.Error(e)) - } else { - plog.Fatalf("failed to purge wal file %v", e) - } + lg.Fatal("failed to purge wal file", zap.Error(e)) case <-s.stopping: if dbdonec != nil { <-dbdonec @@ -876,16 +801,13 @@ func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() // Process takes a raft message and applies it to the server's raft state // machine, respecting any timeout of the given context. func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { + lg := s.getLogger() if s.cluster.IsIDRemoved(types.ID(m.From)) { - if lg := s.getLogger(); lg != nil { - lg.Warn( - "rejected Raft message from removed member", - zap.String("local-member-id", s.ID().String()), - zap.String("removed-member-id", types.ID(m.From).String()), - ) - } else { - plog.Warningf("reject message from removed member %s", types.ID(m.From).String()) - } + lg.Warn( + "rejected Raft message from removed member", + zap.String("local-member-id", s.ID().String()), + zap.String("removed-member-id", types.ID(m.From).String()), + ) return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") } if m.Type == raftpb.MsgApp { @@ -926,11 +848,7 @@ func (s *EtcdServer) run() { sn, err := s.r.raftStorage.Snapshot() if err != nil { - if lg != nil { - lg.Panic("failed to get snapshot from Raft storage", zap.Error(err)) - } else { - plog.Panicf("get snapshot from raft storage error: %v", err) - } + lg.Panic("failed to get snapshot from Raft storage", zap.Error(err)) } // asynchronously accept apply packets, dispatch progress in-order @@ -1068,15 +986,11 @@ func (s *EtcdServer) run() { if lerr == nil { leaseExpired.Inc() } else { - if lg != nil { - lg.Warn( - "failed to revoke lease", - zap.String("lease-id", fmt.Sprintf("%016x", lid)), - zap.Error(lerr), - ) - } else { - plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error()) - } + lg.Warn( + "failed to revoke lease", + zap.String("lease-id", fmt.Sprintf("%016x", lid)), + zap.Error(lerr), + ) } <-c @@ -1084,13 +998,8 @@ func (s *EtcdServer) run() { } }) case err := <-s.errorc: - if lg != nil { - lg.Warn("server error", zap.Error(err)) - lg.Warn("data-dir used by this member must be removed") - } else { - plog.Errorf("%s", err) - plog.Infof("the data-dir used by this member must be removed.") - } + lg.Warn("server error", zap.Error(err)) + lg.Warn("data-dir used by this member must be removed") return case <-getSyncC(): if s.v2store.HasTTLKeys() { @@ -1131,45 +1040,32 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { applySnapshotInProgress.Inc() lg := s.getLogger() - if lg != nil { + lg.Info( + "applying snapshot", + zap.Uint64("current-snapshot-index", ep.snapi), + zap.Uint64("current-applied-index", ep.appliedi), + zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), + ) + defer func() { lg.Info( - "applying snapshot", + "applied snapshot", zap.Uint64("current-snapshot-index", ep.snapi), zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), ) - } else { - plog.Infof("applying snapshot at index %d...", ep.snapi) - } - defer func() { - if lg != nil { - lg.Info( - "applied snapshot", - zap.Uint64("current-snapshot-index", ep.snapi), - zap.Uint64("current-applied-index", ep.appliedi), - zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), - zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), - ) - } else { - plog.Infof("finished applying incoming snapshot at index %d", ep.snapi) - } applySnapshotInProgress.Dec() }() if apply.snapshot.Metadata.Index <= ep.appliedi { - if lg != nil { - lg.Panic( - "unexpected leader snapshot from outdated index", - zap.Uint64("current-snapshot-index", ep.snapi), - zap.Uint64("current-applied-index", ep.appliedi), - zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), - zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), - ) - } else { - plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1", - apply.snapshot.Metadata.Index, ep.appliedi) - } + lg.Panic( + "unexpected leader snapshot from outdated index", + zap.Uint64("current-snapshot-index", ep.snapi), + zap.Uint64("current-applied-index", ep.appliedi), + zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), + ) } // wait for raftNode to persist snapshot onto the disk @@ -1177,51 +1073,27 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot) if err != nil { - if lg != nil { - lg.Panic("failed to open snapshot backend", zap.Error(err)) - } else { - plog.Panic(err) - } + lg.Panic("failed to open snapshot backend", zap.Error(err)) } // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. if s.lessor != nil { - if lg != nil { - lg.Info("restoring lease store") - } else { - plog.Info("recovering lessor...") - } + lg.Info("restoring lease store") s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) }) - if lg != nil { - lg.Info("restored lease store") - } else { - plog.Info("finished recovering lessor") - } + lg.Info("restored lease store") } - if lg != nil { - lg.Info("restoring mvcc store") - } else { - plog.Info("restoring mvcc store...") - } + lg.Info("restoring mvcc store") if err := s.kv.Restore(newbe); err != nil { - if lg != nil { - lg.Panic("failed to restore mvcc store", zap.Error(err)) - } else { - plog.Panicf("restore KV error: %v", err) - } + lg.Panic("failed to restore mvcc store", zap.Error(err)) } s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex()) - if lg != nil { - lg.Info("restored mvcc store") - } else { - plog.Info("finished restoring mvcc store") - } + lg.Info("restored mvcc store") // Closing old backend might block until all the txns // on the backend are finished. @@ -1229,113 +1101,55 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { s.bemu.Lock() oldbe := s.be go func() { - if lg != nil { - lg.Info("closing old backend file") - } else { - plog.Info("closing old backend...") - } + lg.Info("closing old backend file") defer func() { - if lg != nil { - lg.Info("closed old backend file") - } else { - plog.Info("finished closing old backend") - } + lg.Info("closed old backend file") }() if err := oldbe.Close(); err != nil { - if lg != nil { - lg.Panic("failed to close old backend", zap.Error(err)) - } else { - plog.Panicf("close backend error: %v", err) - } + lg.Panic("failed to close old backend", zap.Error(err)) } }() s.be = newbe s.bemu.Unlock() - if lg != nil { - lg.Info("restoring alarm store") - } else { - plog.Info("recovering alarms...") - } + lg.Info("restoring alarm store") if err := s.restoreAlarms(); err != nil { - if lg != nil { - lg.Panic("failed to restore alarm store", zap.Error(err)) - } else { - plog.Panicf("restore alarms error: %v", err) - } + lg.Panic("failed to restore alarm store", zap.Error(err)) } - if lg != nil { - lg.Info("restored alarm store") - } else { - plog.Info("finished recovering alarms") - } + lg.Info("restored alarm store") if s.authStore != nil { - if lg != nil { - lg.Info("restoring auth store") - } else { - plog.Info("recovering auth store...") - } + lg.Info("restoring auth store") s.authStore.Recover(newbe) - if lg != nil { - lg.Info("restored auth store") - } else { - plog.Info("finished recovering auth store") - } + lg.Info("restored auth store") } - if lg != nil { - lg.Info("restoring v2 store") - } else { - plog.Info("recovering store v2...") - } + lg.Info("restoring v2 store") if err := s.v2store.Recovery(apply.snapshot.Data); err != nil { - if lg != nil { - lg.Panic("failed to restore v2 store", zap.Error(err)) - } else { - plog.Panicf("recovery store error: %v", err) - } + lg.Panic("failed to restore v2 store", zap.Error(err)) } - if lg != nil { - lg.Info("restored v2 store") - } else { - plog.Info("finished recovering store v2") - } + lg.Info("restored v2 store") s.cluster.SetBackend(newbe) - if lg != nil { - lg.Info("restoring cluster configuration") - } else { - plog.Info("recovering cluster configuration...") - } + lg.Info("restoring cluster configuration") s.cluster.Recover(api.UpdateCapability) - if lg != nil { - lg.Info("restored cluster configuration") - lg.Info("removing old peers from network") - } else { - plog.Info("finished recovering cluster configuration") - plog.Info("removing old peers from network...") - } + lg.Info("restored cluster configuration") + lg.Info("removing old peers from network") // recover raft transport s.r.transport.RemoveAllPeers() - if lg != nil { - lg.Info("removed old peers from network") - lg.Info("adding peers from new cluster configuration") - } else { - plog.Info("finished removing old peers from network") - plog.Info("adding peers from new cluster configuration into network...") - } + lg.Info("removed old peers from network") + lg.Info("adding peers from new cluster configuration") for _, m := range s.cluster.Members() { if m.ID == s.ID() { @@ -1344,11 +1158,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { s.r.transport.AddPeer(m.ID, m.PeerURLs) } - if lg != nil { - lg.Info("added peers from new cluster configuration") - } else { - plog.Info("finished adding peers from new cluster configuration into network...") - } + lg.Info("added peers from new cluster configuration") ep.appliedt = apply.snapshot.Metadata.Term ep.appliedi = apply.snapshot.Metadata.Index @@ -1362,15 +1172,12 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { } firsti := apply.entries[0].Index if firsti > ep.appliedi+1 { - if lg := s.getLogger(); lg != nil { - lg.Panic( - "unexpected committed entry index", - zap.Uint64("current-applied-index", ep.appliedi), - zap.Uint64("first-committed-entry-index", firsti), - ) - } else { - plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, ep.appliedi) - } + lg := s.getLogger() + lg.Panic( + "unexpected committed entry index", + zap.Uint64("current-applied-index", ep.appliedi), + zap.Uint64("first-committed-entry-index", firsti), + ) } var ents []raftpb.Entry if ep.appliedi+1-firsti < uint64(len(apply.entries)) { @@ -1390,17 +1197,14 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { return } - if lg := s.getLogger(); lg != nil { - lg.Info( - "triggering snapshot", - zap.String("local-member-id", s.ID().String()), - zap.Uint64("local-member-applied-index", ep.appliedi), - zap.Uint64("local-member-snapshot-index", ep.snapi), - zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount), - ) - } else { - plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi) - } + lg := s.getLogger() + lg.Info( + "triggering snapshot", + zap.String("local-member-id", s.ID().String()), + zap.Uint64("local-member-applied-index", ep.appliedi), + zap.Uint64("local-member-snapshot-index", ep.snapi), + zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount), + ) s.snapshot(ep.appliedi, ep.confState) ep.snapi = ep.appliedi @@ -1423,16 +1227,13 @@ func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) er now := time.Now() interval := time.Duration(s.Cfg.TickMs) * time.Millisecond - if lg := s.getLogger(); lg != nil { - lg.Info( - "leadership transfer starting", - zap.String("local-member-id", s.ID().String()), - zap.String("current-leader-member-id", types.ID(lead).String()), - zap.String("transferee-member-id", types.ID(transferee).String()), - ) - } else { - plog.Infof("%s starts leadership transfer from %s to %s", s.ID(), types.ID(lead), types.ID(transferee)) - } + lg := s.getLogger() + lg.Info( + "leadership transfer starting", + zap.String("local-member-id", s.ID().String()), + zap.String("current-leader-member-id", types.ID(lead).String()), + zap.String("transferee-member-id", types.ID(transferee).String()), + ) s.r.TransferLeadership(ctx, lead, transferee) for s.Lead() != transferee { @@ -1444,45 +1245,34 @@ func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) er } // TODO: drain all requests, or drop all messages to the old leader - if lg := s.getLogger(); lg != nil { - lg.Info( - "leadership transfer finished", - zap.String("local-member-id", s.ID().String()), - zap.String("old-leader-member-id", types.ID(lead).String()), - zap.String("new-leader-member-id", types.ID(transferee).String()), - zap.Duration("took", time.Since(now)), - ) - } else { - plog.Infof("%s finished leadership transfer from %s to %s (took %v)", s.ID(), types.ID(lead), types.ID(transferee), time.Since(now)) - } + lg.Info( + "leadership transfer finished", + zap.String("local-member-id", s.ID().String()), + zap.String("old-leader-member-id", types.ID(lead).String()), + zap.String("new-leader-member-id", types.ID(transferee).String()), + zap.Duration("took", time.Since(now)), + ) return nil } // TransferLeadership transfers the leader to the chosen transferee. func (s *EtcdServer) TransferLeadership() error { + lg := s.getLogger() if !s.isLeader() { - if lg := s.getLogger(); lg != nil { - lg.Info( - "skipped leadership transfer; local server is not leader", - zap.String("local-member-id", s.ID().String()), - zap.String("current-leader-member-id", types.ID(s.Lead()).String()), - ) - } else { - plog.Printf("skipped leadership transfer for stopping non-leader member") - } + lg.Info( + "skipped leadership transfer; local server is not leader", + zap.String("local-member-id", s.ID().String()), + zap.String("current-leader-member-id", types.ID(s.Lead()).String()), + ) return nil } if !s.hasMultipleVotingMembers() { - if lg := s.getLogger(); lg != nil { - lg.Info( - "skipped leadership transfer for single voting member cluster", - zap.String("local-member-id", s.ID().String()), - zap.String("current-leader-member-id", types.ID(s.Lead()).String()), - ) - } else { - plog.Printf("skipped leadership transfer for single voting member cluster") - } + lg.Info( + "skipped leadership transfer for single voting member cluster", + zap.String("local-member-id", s.ID().String()), + zap.String("current-leader-member-id", types.ID(s.Lead()).String()), + ) return nil } @@ -1515,12 +1305,9 @@ func (s *EtcdServer) HardStop() { // Stop terminates the Server and performs any necessary finalization. // Do and Process cannot be called after Stop has been invoked. func (s *EtcdServer) Stop() { + lg := s.getLogger() if err := s.TransferLeadership(); err != nil { - if lg := s.getLogger(); lg != nil { - lg.Warn("leadership transfer failed", zap.String("local-member-id", s.ID().String()), zap.Error(err)) - } else { - plog.Warningf("%s failed to transfer leadership (%v)", s.ID(), err) - } + lg.Warn("leadership transfer failed", zap.String("local-member-id", s.ID().String()), zap.Error(err)) } s.HardStop() } @@ -1607,36 +1394,29 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]* } func (s *EtcdServer) mayAddMember(memb membership.Member) error { + lg := s.getLogger() if !s.Cfg.StrictReconfigCheck { return nil } // protect quorum when adding voting member if !memb.IsLearner && !s.cluster.IsReadyToAddVotingMember() { - if lg := s.getLogger(); lg != nil { - lg.Warn( - "rejecting member add request; not enough healthy members", - zap.String("local-member-id", s.ID().String()), - zap.String("requested-member-add", fmt.Sprintf("%+v", memb)), - zap.Error(ErrNotEnoughStartedMembers), - ) - } else { - plog.Warningf("not enough started members, rejecting member add %+v", memb) - } + lg.Warn( + "rejecting member add request; not enough healthy members", + zap.String("local-member-id", s.ID().String()), + zap.String("requested-member-add", fmt.Sprintf("%+v", memb)), + zap.Error(ErrNotEnoughStartedMembers), + ) return ErrNotEnoughStartedMembers } if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.VotingMembers()) { - if lg := s.getLogger(); lg != nil { - lg.Warn( - "rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum", - zap.String("local-member-id", s.ID().String()), - zap.String("requested-member-add", fmt.Sprintf("%+v", memb)), - zap.Error(ErrUnhealthy), - ) - } else { - plog.Warningf("not healthy for reconfigure, rejecting member add %+v", memb) - } + lg.Warn( + "rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum", + zap.String("local-member-id", s.ID().String()), + zap.String("requested-member-add", fmt.Sprintf("%+v", memb)), + zap.Error(ErrUnhealthy), + ) return ErrUnhealthy } @@ -1740,6 +1520,7 @@ func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membershi } func (s *EtcdServer) mayPromoteMember(id types.ID) error { + lg := s.getLogger() err := s.isLearnerReady(uint64(id)) if err != nil { return err @@ -1749,16 +1530,12 @@ func (s *EtcdServer) mayPromoteMember(id types.ID) error { return nil } if !s.cluster.IsReadyToPromoteMember(uint64(id)) { - if lg := s.getLogger(); lg != nil { - lg.Warn( - "rejecting member promote request; not enough healthy members", - zap.String("local-member-id", s.ID().String()), - zap.String("requested-member-remove-id", id.String()), - zap.Error(ErrNotEnoughStartedMembers), - ) - } else { - plog.Warningf("not enough started members, rejecting promote member %s", id) - } + lg.Warn( + "rejecting member promote request; not enough healthy members", + zap.String("local-member-id", s.ID().String()), + zap.String("requested-member-remove-id", id.String()), + zap.Error(ErrNotEnoughStartedMembers), + ) return ErrNotEnoughStartedMembers } @@ -1804,6 +1581,7 @@ func (s *EtcdServer) mayRemoveMember(id types.ID) error { return nil } + lg := s.getLogger() isLearner := s.cluster.IsMemberExist(id) && s.cluster.Member(id).IsLearner // no need to check quorum when removing non-voting member if isLearner { @@ -1811,16 +1589,12 @@ func (s *EtcdServer) mayRemoveMember(id types.ID) error { } if !s.cluster.IsReadyToRemoveVotingMember(uint64(id)) { - if lg := s.getLogger(); lg != nil { - lg.Warn( - "rejecting member remove request; not enough healthy members", - zap.String("local-member-id", s.ID().String()), - zap.String("requested-member-remove-id", id.String()), - zap.Error(ErrNotEnoughStartedMembers), - ) - } else { - plog.Warningf("not enough started members, rejecting remove member %s", id) - } + lg.Warn( + "rejecting member remove request; not enough healthy members", + zap.String("local-member-id", s.ID().String()), + zap.String("requested-member-remove-id", id.String()), + zap.Error(ErrNotEnoughStartedMembers), + ) return ErrNotEnoughStartedMembers } @@ -1833,17 +1607,13 @@ func (s *EtcdServer) mayRemoveMember(id types.ID) error { m := s.cluster.VotingMembers() active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), m) if (active - 1) < 1+((len(m)-1)/2) { - if lg := s.getLogger(); lg != nil { - lg.Warn( - "rejecting member remove request; local member has not been connected to all peers, reconfigure breaks active quorum", - zap.String("local-member-id", s.ID().String()), - zap.String("requested-member-remove", id.String()), - zap.Int("active-peers", active), - zap.Error(ErrUnhealthy), - ) - } else { - plog.Warningf("reconfigure breaks active quorum, rejecting remove member %s", id) - } + lg.Warn( + "rejecting member remove request; local member has not been connected to all peers, reconfigure breaks active quorum", + zap.String("local-member-id", s.ID().String()), + zap.String("requested-member-remove", id.String()), + zap.Int("active-peers", active), + zap.Error(ErrUnhealthy), + ) return ErrUnhealthy } @@ -1935,6 +1705,7 @@ type confChangeResponse struct { // then waits for it to be applied to the server. It // will block until the change is performed or there is an error. func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) { + lg := s.getLogger() cc.ID = s.reqIDGen.Next() ch := s.w.Register(cc.ID) @@ -1947,21 +1718,15 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*me select { case x := <-ch: if x == nil { - if lg := s.getLogger(); lg != nil { - lg.Panic("failed to configure") - } else { - plog.Panicf("configure trigger value should never be nil") - } + lg.Panic("failed to configure") } resp := x.(*confChangeResponse) - if lg := s.getLogger(); lg != nil { - lg.Info( - "applied a configuration change through raft", - zap.String("local-member-id", s.ID().String()), - zap.String("raft-conf-change", cc.Type.String()), - zap.String("raft-conf-change-node-id", types.ID(cc.NodeID).String()), - ) - } + lg.Info( + "applied a configuration change through raft", + zap.String("local-member-id", s.ID().String()), + zap.String("raft-conf-change", cc.Type.String()), + zap.String("raft-conf-change-node-id", types.ID(cc.NodeID).String()), + ) return resp.membs, resp.err case <-ctx.Done(): @@ -2060,13 +1825,10 @@ func (s *EtcdServer) publishV3(timeout time.Duration) { // process publish requests through rafthttp // TODO: Deprecate v2 store in 3.6 func (s *EtcdServer) publish(timeout time.Duration) { + lg := s.getLogger() b, err := json.Marshal(s.attributes) if err != nil { - if lg := s.getLogger(); lg != nil { - lg.Panic("failed to marshal JSON", zap.Error(err)) - } else { - plog.Panicf("json marshal error: %v", err) - } + lg.Panic("failed to marshal JSON", zap.Error(err)) return } req := pb.Request{ @@ -2082,47 +1844,35 @@ func (s *EtcdServer) publish(timeout time.Duration) { switch err { case nil: close(s.readych) - if lg := s.getLogger(); lg != nil { - lg.Info( - "published local member to cluster through raft", - zap.String("local-member-id", s.ID().String()), - zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), - zap.String("request-path", req.Path), - zap.String("cluster-id", s.cluster.ID().String()), - zap.Duration("publish-timeout", timeout), - ) - } else { - plog.Infof("published %+v to cluster %s", s.attributes, s.cluster.ID()) - } + lg.Info( + "published local member to cluster through raft", + zap.String("local-member-id", s.ID().String()), + zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), + zap.String("request-path", req.Path), + zap.String("cluster-id", s.cluster.ID().String()), + zap.Duration("publish-timeout", timeout), + ) return case ErrStopped: - if lg := s.getLogger(); lg != nil { - lg.Warn( - "stopped publish because server is stopped", - zap.String("local-member-id", s.ID().String()), - zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), - zap.Duration("publish-timeout", timeout), - zap.Error(err), - ) - } else { - plog.Infof("aborting publish because server is stopped") - } + lg.Warn( + "stopped publish because server is stopped", + zap.String("local-member-id", s.ID().String()), + zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), + zap.Duration("publish-timeout", timeout), + zap.Error(err), + ) return default: - if lg := s.getLogger(); lg != nil { - lg.Warn( - "failed to publish local member to cluster through raft", - zap.String("local-member-id", s.ID().String()), - zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), - zap.String("request-path", req.Path), - zap.Duration("publish-timeout", timeout), - zap.Error(err), - ) - } else { - plog.Errorf("publish error: %v", err) - } + lg.Warn( + "failed to publish local member to cluster through raft", + zap.String("local-member-id", s.ID().String()), + zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), + zap.String("request-path", req.Path), + zap.Duration("publish-timeout", timeout), + zap.Error(err), + ) } } } @@ -2140,9 +1890,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) { now := time.Now() s.r.transport.SendSnapshot(merged) - if lg != nil { - lg.Info("sending merged snapshot", fields...) - } + lg.Info("sending merged snapshot", fields...) s.goAttach(func() { select { @@ -2160,14 +1908,10 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) { atomic.AddInt64(&s.inflightSnapshots, -1) - if lg != nil { - lg.Info("sent merged snapshot", append(fields, zap.Duration("took", time.Since(now)))...) - } + lg.Info("sent merged snapshot", append(fields, zap.Duration("took", time.Since(now)))...) case <-s.stopping: - if lg != nil { - lg.Warn("canceled sending merged snapshot; server stopping", fields...) - } + lg.Warn("canceled sending merged snapshot; server stopping", fields...) return } }) @@ -2202,14 +1946,11 @@ func (s *EtcdServer) apply( s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err}) default: - if lg := s.getLogger(); lg != nil { - lg.Panic( - "unknown entry type; must be either EntryNormal or EntryConfChange", - zap.String("type", e.Type.String()), - ) - } else { - plog.Panicf("entry type should be either EntryNormal or EntryConfChange") - } + lg := s.getLogger() + lg.Panic( + "unknown entry type; must be either EntryNormal or EntryConfChange", + zap.String("type", e.Type.String()), + ) } appliedi, appliedt = e.Index, e.Term } @@ -2282,16 +2023,13 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { return } - if lg := s.getLogger(); lg != nil { - lg.Warn( - "message exceeded backend quota; raising alarm", - zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes), - zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))), - zap.Error(ar.err), - ) - } else { - plog.Errorf("applying raft message exceeded backend quota") - } + lg := s.getLogger() + lg.Warn( + "message exceeded backend quota; raising alarm", + zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes), + zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))), + zap.Error(ar.err), + ) s.goAttach(func() { a := &pb.AlarmRequest{ @@ -2319,22 +2057,14 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode: confChangeContext := new(membership.ConfigChangeContext) if err := json.Unmarshal(cc.Context, confChangeContext); err != nil { - if lg != nil { - lg.Panic("failed to unmarshal member", zap.Error(err)) - } else { - plog.Panicf("unmarshal member should never fail: %v", err) - } + lg.Panic("failed to unmarshal member", zap.Error(err)) } if cc.NodeID != uint64(confChangeContext.Member.ID) { - if lg != nil { - lg.Panic( - "got different member ID", - zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()), - zap.String("member-id-from-message", confChangeContext.Member.ID.String()), - ) - } else { - plog.Panicf("nodeID should always be equal to member ID") - } + lg.Panic( + "got different member ID", + zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()), + zap.String("member-id-from-message", confChangeContext.Member.ID.String()), + ) } if confChangeContext.IsPromote { s.cluster.PromoteMember(confChangeContext.Member.ID) @@ -2366,22 +2096,14 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con case raftpb.ConfChangeUpdateNode: m := new(membership.Member) if err := json.Unmarshal(cc.Context, m); err != nil { - if lg != nil { - lg.Panic("failed to unmarshal member", zap.Error(err)) - } else { - plog.Panicf("unmarshal member should never fail: %v", err) - } + lg.Panic("failed to unmarshal member", zap.Error(err)) } if cc.NodeID != uint64(m.ID) { - if lg != nil { - lg.Panic( - "got different member ID", - zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()), - zap.String("member-id-from-message", m.ID.String()), - ) - } else { - plog.Panicf("nodeID should always be equal to member ID") - } + lg.Panic( + "got different member ID", + zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()), + zap.String("member-id-from-message", m.ID.String()), + ) } s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) if m.ID != s.id { @@ -2409,11 +2131,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { // TODO: current store will never fail to do a snapshot // what should we do if the store might fail? if err != nil { - if lg != nil { - lg.Panic("failed to save v2 store", zap.Error(err)) - } else { - plog.Panicf("store save should never fail: %v", err) - } + lg.Panic("failed to save v2 store", zap.Error(err)) } snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d) if err != nil { @@ -2422,29 +2140,17 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { if err == raft.ErrSnapOutOfDate { return } - if lg != nil { - lg.Panic("failed to create snapshot", zap.Error(err)) - } else { - plog.Panicf("unexpected create snapshot error %v", err) - } + lg.Panic("failed to create snapshot", zap.Error(err)) } // SaveSnap saves the snapshot and releases the locked wal files // to the snapshot index. if err = s.r.storage.SaveSnap(snap); err != nil { - if lg != nil { - lg.Panic("failed to save snapshot", zap.Error(err)) - } else { - plog.Fatalf("save snapshot error: %v", err) - } - } - if lg != nil { - lg.Info( - "saved snapshot", - zap.Uint64("snapshot-index", snap.Metadata.Index), - ) - } else { - plog.Infof("saved snapshot at index %d", snap.Metadata.Index) + lg.Panic("failed to save snapshot", zap.Error(err)) } + lg.Info( + "saved snapshot", + zap.Uint64("snapshot-index", snap.Metadata.Index), + ) // When sending a snapshot, etcd will pause compaction. // After receives a snapshot, the slow follower needs to get all the entries right after @@ -2452,11 +2158,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { // the snapshot sent might already be compacted. It happens when the snapshot takes long time // to send and save. Pausing compaction avoids triggering a snapshot sending cycle. if atomic.LoadInt64(&s.inflightSnapshots) != 0 { - if lg != nil { - lg.Info("skip compaction since there is an inflight snapshot") - } else { - plog.Infof("skip compaction since there is an inflight snapshot") - } + lg.Info("skip compaction since there is an inflight snapshot") return } @@ -2473,20 +2175,12 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { if err == raft.ErrCompacted { return } - if lg != nil { - lg.Panic("failed to compact", zap.Error(err)) - } else { - plog.Panicf("unexpected compaction error %v", err) - } - } - if lg != nil { - lg.Info( - "compacted Raft logs", - zap.Uint64("compact-index", compacti), - ) - } else { - plog.Infof("compacted raft log at %d", compacti) + lg.Panic("failed to compact", zap.Error(err)) } + lg.Info( + "compacted Raft logs", + zap.Uint64("compact-index", compacti), + ) }) } @@ -2567,24 +2261,16 @@ func (s *EtcdServer) updateClusterVersion(ver string) { lg := s.getLogger() if s.cluster.Version() == nil { - if lg != nil { - lg.Info( - "setting up initial cluster version", - zap.String("cluster-version", version.Cluster(ver)), - ) - } else { - plog.Infof("setting up the initial cluster version to %s", version.Cluster(ver)) - } + lg.Info( + "setting up initial cluster version", + zap.String("cluster-version", version.Cluster(ver)), + ) } else { - if lg != nil { - lg.Info( - "updating cluster version", - zap.String("from", version.Cluster(s.cluster.Version().String())), - zap.String("to", version.Cluster(ver)), - ) - } else { - plog.Infof("updating the cluster version from %s to %s", version.Cluster(s.cluster.Version().String()), version.Cluster(ver)) - } + lg.Info( + "updating cluster version", + zap.String("from", version.Cluster(s.cluster.Version().String())), + zap.String("to", version.Cluster(ver)), + ) } req := membershippb.ClusterVersionSetRequest{Ver: ver} @@ -2595,25 +2281,15 @@ func (s *EtcdServer) updateClusterVersion(ver string) { switch err { case nil: - if lg != nil { - lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver))) - } + lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver))) return case ErrStopped: - if lg != nil { - lg.Warn("aborting cluster version update; server is stopped", zap.Error(err)) - } else { - plog.Infof("aborting update cluster version because server is stopped") - } + lg.Warn("aborting cluster version update; server is stopped", zap.Error(err)) return default: - if lg != nil { - lg.Warn("failed to update cluster version", zap.Error(err)) - } else { - plog.Errorf("error updating cluster version (%v)", err) - } + lg.Warn("failed to update cluster version", zap.Error(err)) } } @@ -2682,11 +2358,8 @@ func (s *EtcdServer) goAttach(f func()) { defer s.wgMu.RUnlock() select { case <-s.stopping: - if lg := s.getLogger(); lg != nil { - lg.Warn("server has stopped; skipping goAttach") - } else { - plog.Warning("server has stopped (skipping goAttach)") - } + lg := s.getLogger() + lg.Warn("server has stopped; skipping goAttach") return default: } diff --git a/etcdserver/snapshot_merge.go b/etcdserver/snapshot_merge.go index 417776813..605138a25 100644 --- a/etcdserver/snapshot_merge.go +++ b/etcdserver/snapshot_merge.go @@ -29,22 +29,19 @@ import ( // a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message // as ReadCloser. func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message { + lg := s.getLogger() // get a snapshot of v2 store as []byte clone := s.v2store.Clone() d, err := clone.SaveNoCopy() if err != nil { - if lg := s.getLogger(); lg != nil { - lg.Panic("failed to save v2 store data", zap.Error(err)) - } else { - plog.Panicf("store save should never fail: %v", err) - } + lg.Panic("failed to save v2 store data", zap.Error(err)) } // commit kv to write metadata(for example: consistent index). s.KV().Commit() dbsnap := s.be.Snapshot() // get a snapshot of v3 KV as readCloser - rc := newSnapshotReaderCloser(s.getLogger(), dbsnap) + rc := newSnapshotReaderCloser(lg, dbsnap) // put the []byte snapshot of store into raft snapshot and return the merged snapshot with // KV readCloser snapshot. @@ -66,34 +63,22 @@ func newSnapshotReaderCloser(lg *zap.Logger, snapshot backend.Snapshot) io.ReadC go func() { n, err := snapshot.WriteTo(pw) if err == nil { - if lg != nil { - lg.Info( - "sent database snapshot to writer", - zap.Int64("bytes", n), - zap.String("size", humanize.Bytes(uint64(n))), - ) - } else { - plog.Infof("wrote database snapshot out [total bytes: %d]", n) - } + lg.Info( + "sent database snapshot to writer", + zap.Int64("bytes", n), + zap.String("size", humanize.Bytes(uint64(n))), + ) } else { - if lg != nil { - lg.Warn( - "failed to send database snapshot to writer", - zap.String("size", humanize.Bytes(uint64(n))), - zap.Error(err), - ) - } else { - plog.Warningf("failed to write database snapshot out [written bytes: %d]: %v", n, err) - } + lg.Warn( + "failed to send database snapshot to writer", + zap.String("size", humanize.Bytes(uint64(n))), + zap.Error(err), + ) } pw.CloseWithError(err) err = snapshot.Close() if err != nil { - if lg != nil { - lg.Panic("failed to close database snapshot", zap.Error(err)) - } else { - plog.Panicf("failed to close database snapshot: %v", err) - } + lg.Panic("failed to close database snapshot", zap.Error(err)) } }() return pr diff --git a/etcdserver/storage.go b/etcdserver/storage.go index d57b6f9a5..1b4a5214e 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -74,34 +74,18 @@ func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id repaired := false for { if w, err = wal.Open(lg, waldir, snap); err != nil { - if lg != nil { - lg.Fatal("failed to open WAL", zap.Error(err)) - } else { - plog.Fatalf("open wal error: %v", err) - } + lg.Fatal("failed to open WAL", zap.Error(err)) } if wmetadata, st, ents, err = w.ReadAll(); err != nil { w.Close() // we can only repair ErrUnexpectedEOF and we never repair twice. if repaired || err != io.ErrUnexpectedEOF { - if lg != nil { - lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err)) - } else { - plog.Fatalf("read wal error (%v) and cannot be repaired", err) - } + lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err)) } if !wal.Repair(lg, waldir) { - if lg != nil { - lg.Fatal("failed to repair WAL", zap.Error(err)) - } else { - plog.Fatalf("WAL error (%v) cannot be repaired", err) - } + lg.Fatal("failed to repair WAL", zap.Error(err)) } else { - if lg != nil { - lg.Info("repaired WAL", zap.Error(err)) - } else { - plog.Infof("repaired WAL error (%v)", err) - } + lg.Info("repaired WAL", zap.Error(err)) repaired = true } continue diff --git a/etcdserver/util.go b/etcdserver/util.go index fe5024ef0..609d19959 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -140,25 +140,15 @@ func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, now time.Time, reqStrin func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) { d := time.Since(now) if d > warnApplyDuration { - if lg != nil { - lg.Warn( - "apply request took too long", - zap.Duration("took", d), - zap.Duration("expected-duration", warnApplyDuration), - zap.String("prefix", prefix), - zap.String("request", reqStringer.String()), - zap.String("response", resp), - zap.Error(err), - ) - } else { - var result string - if err != nil { - result = fmt.Sprintf("error:%v", err) - } else { - result = resp - } - plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d) - } + lg.Warn( + "apply request took too long", + zap.Duration("took", d), + zap.Duration("expected-duration", warnApplyDuration), + zap.String("prefix", prefix), + zap.String("request", reqStringer.String()), + zap.String("response", resp), + zap.Error(err), + ) slowApplies.Inc() } } diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 49921a93a..0ab87f7dd 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -419,15 +419,11 @@ func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password) if err != nil { if err != auth.ErrAuthNotEnabled { - if lg != nil { - lg.Warn( - "invalid authentication was requested", - zap.String("user", r.Name), - zap.Error(err), - ) - } else { - plog.Errorf("invalid authentication request to user %s was issued", r.Name) - } + lg.Warn( + "invalid authentication was requested", + zap.String("user", r.Name), + zap.Error(err), + ) } return nil, err } @@ -451,11 +447,7 @@ func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest break } - if lg != nil { - lg.Info("revision when password checked became stale; retrying") - } else { - plog.Infof("revision when password checked is obsolete, retrying") - } + lg.Info("revision when password checked became stale; retrying") } return resp.(*pb.AuthenticateResponse), nil @@ -707,11 +699,7 @@ func (s *EtcdServer) linearizableReadLoop() { if err == raft.ErrStopped { return } - if lg != nil { - lg.Warn("failed to get read index from Raft", zap.Error(err)) - } else { - plog.Errorf("failed to get read index from raft: %v", err) - } + lg.Warn("failed to get read index from Raft", zap.Error(err)) readIndexFailed.Inc() nr.notify(err) continue @@ -733,15 +721,11 @@ func (s *EtcdServer) linearizableReadLoop() { if len(rs.RequestCtx) == 8 { id2 = binary.BigEndian.Uint64(rs.RequestCtx) } - if lg != nil { - lg.Warn( - "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader", - zap.Uint64("sent-request-id", id1), - zap.Uint64("received-request-id", id2), - ) - } else { - plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2) - } + lg.Warn( + "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader", + zap.Uint64("sent-request-id", id1), + zap.Uint64("received-request-id", id2), + ) slowReadIndex.Inc() } case <-leaderChangedNotifier: @@ -750,11 +734,7 @@ func (s *EtcdServer) linearizableReadLoop() { // return a retryable error. nr.notify(ErrLeaderChanged) case <-time.After(s.Cfg.ReqTimeout()): - if lg != nil { - lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout())) - } else { - plog.Warningf("timed out waiting for read index response (local node might have slow network)") - } + lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout())) nr.notify(ErrTimeout) timeout = true slowReadIndex.Inc()