etcdserver: remove capnslog (#11611)

remove capnslog from etcdserver pkg, except etcdserver/api.
This commit is contained in:
Jingyi Hu 2020-02-11 08:54:14 -08:00 committed by GitHub
parent a0bb739c4e
commit c94782cd55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 556 additions and 1184 deletions

View File

@ -553,31 +553,19 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
case *pb.RequestOp_RequestRange:
resp, err := a.Range(context.TODO(), txn, tv.RequestRange)
if err != nil {
if lg != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
} else {
plog.Panicf("unexpected error during txn: %v", err)
}
lg.Panic("unexpected error during txn", zap.Error(err))
}
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
case *pb.RequestOp_RequestPut:
resp, _, err := a.Put(txn, tv.RequestPut)
if err != nil {
if lg != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
} else {
plog.Panicf("unexpected error during txn: %v", err)
}
lg.Panic("unexpected error during txn", zap.Error(err))
}
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
case *pb.RequestOp_RequestDeleteRange:
resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
if err != nil {
if lg != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
} else {
plog.Panicf("unexpected error during txn: %v", err)
}
lg.Panic("unexpected error during txn", zap.Error(err))
}
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
case *pb.RequestOp_RequestTxn:
@ -655,22 +643,14 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
break
}
if lg != nil {
lg.Warn("alarm raised", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
} else {
plog.Warningf("alarm %v raised by peer %s", m.Alarm, types.ID(m.MemberID))
}
lg.Warn("alarm raised", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
switch m.Alarm {
case pb.AlarmType_CORRUPT:
a.s.applyV3 = newApplierV3Corrupt(a)
case pb.AlarmType_NOSPACE:
a.s.applyV3 = newApplierV3Capped(a)
default:
if lg != nil {
lg.Warn("unimplemented alarm activation", zap.String("alarm", fmt.Sprintf("%+v", m)))
} else {
plog.Errorf("unimplemented alarm activation (%+v)", m)
}
lg.Warn("unimplemented alarm activation", zap.String("alarm", fmt.Sprintf("%+v", m)))
}
case pb.AlarmRequest_DEACTIVATE:
m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm)
@ -686,18 +666,10 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
switch m.Alarm {
case pb.AlarmType_NOSPACE, pb.AlarmType_CORRUPT:
// TODO: check kv hash before deactivating CORRUPT?
if lg != nil {
lg.Warn("alarm disarmed", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
} else {
plog.Infof("alarm disarmed %+v", ar)
}
lg.Warn("alarm disarmed", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
a.s.applyV3 = a.s.newApplierV3()
default:
if lg != nil {
lg.Warn("unimplemented alarm deactivation", zap.String("alarm", fmt.Sprintf("%+v", m)))
} else {
plog.Errorf("unimplemented alarm deactivation (%+v)", m)
}
lg.Warn("unimplemented alarm deactivation", zap.String("alarm", fmt.Sprintf("%+v", m)))
}
default:
return nil, nil

View File

@ -36,6 +36,9 @@ type ApplierV2 interface {
}
func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 {
if lg == nil {
lg = zap.NewNop()
}
return &applierV2store{lg: lg, store: s, cluster: c}
}
@ -77,11 +80,7 @@ func (a *applierV2store) Put(r *RequestV2) Response {
id := membership.MustParseMemberIDFromKey(path.Dir(r.Path))
var attr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
if a.lg != nil {
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
} else {
plog.Panicf("unmarshal %s should never fail: %v", r.Val, err)
}
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
}
if a.cluster != nil {
a.cluster.UpdateAttributes(id, attr)

View File

@ -75,22 +75,15 @@ func openBackend(cfg ServerConfig) backend.Backend {
select {
case be := <-beOpened:
if cfg.Logger != nil {
cfg.Logger.Info("opened backend db", zap.String("path", fn), zap.Duration("took", time.Since(now)))
}
cfg.Logger.Info("opened backend db", zap.String("path", fn), zap.Duration("took", time.Since(now)))
return be
case <-time.After(10 * time.Second):
if cfg.Logger != nil {
cfg.Logger.Info(
"db file is flocked by another process, or taking too long",
zap.String("path", fn),
zap.Duration("took", time.Since(now)),
)
} else {
plog.Warningf("another etcd process is using %q and holds the file lock, or loading backend file is taking >10 seconds", fn)
plog.Warningf("waiting for it to exit before starting...")
}
cfg.Logger.Info(
"db file is flocked by another process, or taking too long",
zap.String("path", fn),
zap.Duration("took", time.Since(now)),
)
}
return <-beOpened

View File

@ -63,6 +63,9 @@ func GetClusterFromRemotePeers(lg *zap.Logger, urls []string, rt http.RoundTripp
// If logerr is true, it prints out more error messages.
func getClusterFromRemotePeers(lg *zap.Logger, urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*membership.RaftCluster, error) {
if lg == nil {
lg = zap.NewNop()
}
cc := &http.Client{
Transport: rt,
Timeout: timeout,
@ -72,11 +75,7 @@ func getClusterFromRemotePeers(lg *zap.Logger, urls []string, timeout time.Durat
resp, err := cc.Get(addr)
if err != nil {
if logerr {
if lg != nil {
lg.Warn("failed to get cluster response", zap.String("address", addr), zap.Error(err))
} else {
plog.Warningf("could not get cluster response from %s: %v", u, err)
}
lg.Warn("failed to get cluster response", zap.String("address", addr), zap.Error(err))
}
continue
}
@ -84,38 +83,26 @@ func getClusterFromRemotePeers(lg *zap.Logger, urls []string, timeout time.Durat
resp.Body.Close()
if err != nil {
if logerr {
if lg != nil {
lg.Warn("failed to read body of cluster response", zap.String("address", addr), zap.Error(err))
} else {
plog.Warningf("could not read the body of cluster response: %v", err)
}
lg.Warn("failed to read body of cluster response", zap.String("address", addr), zap.Error(err))
}
continue
}
var membs []*membership.Member
if err = json.Unmarshal(b, &membs); err != nil {
if logerr {
if lg != nil {
lg.Warn("failed to unmarshal cluster response", zap.String("address", addr), zap.Error(err))
} else {
plog.Warningf("could not unmarshal cluster response: %v", err)
}
lg.Warn("failed to unmarshal cluster response", zap.String("address", addr), zap.Error(err))
}
continue
}
id, err := types.IDFromString(resp.Header.Get("X-Etcd-Cluster-ID"))
if err != nil {
if logerr {
if lg != nil {
lg.Warn(
"failed to parse cluster ID",
zap.String("address", addr),
zap.String("header", resp.Header.Get("X-Etcd-Cluster-ID")),
zap.Error(err),
)
} else {
plog.Warningf("could not parse the cluster ID from cluster res: %v", err)
}
lg.Warn(
"failed to parse cluster ID",
zap.String("address", addr),
zap.String("header", resp.Header.Get("X-Etcd-Cluster-ID")),
zap.Error(err),
)
}
continue
}
@ -164,11 +151,7 @@ func getVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt
}
ver, err := getVersion(lg, m, rt)
if err != nil {
if lg != nil {
lg.Warn("failed to get version", zap.String("remote-member-id", m.ID.String()), zap.Error(err))
} else {
plog.Warningf("cannot get the version of member %s (%v)", m.ID, err)
}
lg.Warn("failed to get version", zap.String("remote-member-id", m.ID.String()), zap.Error(err))
vers[m.ID.String()] = nil
} else {
vers[m.ID.String()] = ver
@ -190,30 +173,21 @@ func decideClusterVersion(lg *zap.Logger, vers map[string]*version.Versions) *se
}
v, err := semver.NewVersion(ver.Server)
if err != nil {
if lg != nil {
lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
} else {
plog.Errorf("cannot understand the version of member %s (%v)", mid, err)
}
lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
return nil
}
if lv.LessThan(*v) {
if lg != nil {
lg.Warn(
"leader found higher-versioned member",
zap.String("local-member-version", lv.String()),
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
)
} else {
plog.Warningf("the local etcd version %s is not up-to-date", lv.String())
plog.Warningf("member %s has a higher version %s", mid, ver.Server)
}
lg.Warn(
"leader found higher-versioned member",
zap.String("local-member-version", lv.String()),
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
)
}
if cv == nil {
cv = v
@ -253,42 +227,30 @@ func isCompatibleWithVers(lg *zap.Logger, vers map[string]*version.Versions, loc
}
clusterv, err := semver.NewVersion(v.Cluster)
if err != nil {
if lg != nil {
lg.Warn(
"failed to parse cluster version of remote member",
zap.String("remote-member-id", id),
zap.String("remote-member-cluster-version", v.Cluster),
zap.Error(err),
)
} else {
plog.Errorf("cannot understand the cluster version of member %s (%v)", id, err)
}
lg.Warn(
"failed to parse cluster version of remote member",
zap.String("remote-member-id", id),
zap.String("remote-member-cluster-version", v.Cluster),
zap.Error(err),
)
continue
}
if clusterv.LessThan(*minV) {
if lg != nil {
lg.Warn(
"cluster version of remote member is not compatible; too low",
zap.String("remote-member-id", id),
zap.String("remote-member-cluster-version", clusterv.String()),
zap.String("minimum-cluster-version-supported", minV.String()),
)
} else {
plog.Warningf("the running cluster version(%v) is lower than the minimal cluster version(%v) supported", clusterv.String(), minV.String())
}
lg.Warn(
"cluster version of remote member is not compatible; too low",
zap.String("remote-member-id", id),
zap.String("remote-member-cluster-version", clusterv.String()),
zap.String("minimum-cluster-version-supported", minV.String()),
)
return false
}
if maxV.LessThan(*clusterv) {
if lg != nil {
lg.Warn(
"cluster version of remote member is not compatible; too high",
zap.String("remote-member-id", id),
zap.String("remote-member-cluster-version", clusterv.String()),
zap.String("minimum-cluster-version-supported", minV.String()),
)
} else {
plog.Warningf("the running cluster version(%v) is higher than the maximum cluster version(%v) supported", clusterv.String(), maxV.String())
}
lg.Warn(
"cluster version of remote member is not compatible; too high",
zap.String("remote-member-id", id),
zap.String("remote-member-cluster-version", clusterv.String()),
zap.String("minimum-cluster-version-supported", minV.String()),
)
return false
}
ok = true
@ -311,46 +273,34 @@ func getVersion(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (*ve
addr := u + "/version"
resp, err = cc.Get(addr)
if err != nil {
if lg != nil {
lg.Warn(
"failed to reach the peer URL",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
} else {
plog.Warningf("failed to reach the peerURL(%s) of member %s (%v)", u, m.ID, err)
}
lg.Warn(
"failed to reach the peer URL",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
var b []byte
b, err = ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
if lg != nil {
lg.Warn(
"failed to read body of response",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
} else {
plog.Warningf("failed to read out the response body from the peerURL(%s) of member %s (%v)", u, m.ID, err)
}
lg.Warn(
"failed to read body of response",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
var vers version.Versions
if err = json.Unmarshal(b, &vers); err != nil {
if lg != nil {
lg.Warn(
"failed to unmarshal response",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
} else {
plog.Warningf("failed to unmarshal the response body got from the peerURL(%s) of member %s (%v)", u, m.ID, err)
}
lg.Warn(
"failed to unmarshal response",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
return &vers, nil

View File

@ -39,15 +39,11 @@ func (s *EtcdServer) CheckInitialHashKV() error {
lg := s.getLogger()
if lg != nil {
lg.Info(
"starting initial corruption check",
zap.String("local-member-id", s.ID().String()),
zap.Duration("timeout", s.Cfg.ReqTimeout()),
)
} else {
plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout())
}
lg.Info(
"starting initial corruption check",
zap.String("local-member-id", s.ID().String()),
zap.Duration("timeout", s.Cfg.ReqTimeout()),
)
h, rev, crev, err := s.kv.HashByRev(0)
if err != nil {
@ -72,18 +68,10 @@ func (s *EtcdServer) CheckInitialHashKV() error {
if h != p.resp.Hash {
if crev == p.resp.CompactRevision {
if lg != nil {
lg.Warn("found different hash values from remote peer", fields...)
} else {
plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev)
}
lg.Warn("found different hash values from remote peer", fields...)
mismatch++
} else {
if lg != nil {
lg.Warn("found different compact revision values from remote peer", fields...)
} else {
plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev)
}
lg.Warn("found different compact revision values from remote peer", fields...)
}
}
@ -93,35 +81,27 @@ func (s *EtcdServer) CheckInitialHashKV() error {
if p.err != nil {
switch p.err {
case rpctypes.ErrFutureRev:
if lg != nil {
lg.Warn(
"cannot fetch hash from slow remote peer",
zap.String("local-member-id", s.ID().String()),
zap.Int64("local-member-revision", rev),
zap.Int64("local-member-compact-revision", crev),
zap.Uint32("local-member-hash", h),
zap.String("remote-peer-id", p.id.String()),
zap.Strings("remote-peer-endpoints", p.eps),
zap.Error(err),
)
} else {
plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
}
lg.Warn(
"cannot fetch hash from slow remote peer",
zap.String("local-member-id", s.ID().String()),
zap.Int64("local-member-revision", rev),
zap.Int64("local-member-compact-revision", crev),
zap.Uint32("local-member-hash", h),
zap.String("remote-peer-id", p.id.String()),
zap.Strings("remote-peer-endpoints", p.eps),
zap.Error(err),
)
case rpctypes.ErrCompacted:
if lg != nil {
lg.Warn(
"cannot fetch hash from remote peer; local member is behind",
zap.String("local-member-id", s.ID().String()),
zap.Int64("local-member-revision", rev),
zap.Int64("local-member-compact-revision", crev),
zap.Uint32("local-member-hash", h),
zap.String("remote-peer-id", p.id.String()),
zap.Strings("remote-peer-endpoints", p.eps),
zap.Error(err),
)
} else {
plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
}
lg.Warn(
"cannot fetch hash from remote peer; local member is behind",
zap.String("local-member-id", s.ID().String()),
zap.Int64("local-member-revision", rev),
zap.Int64("local-member-compact-revision", crev),
zap.Uint32("local-member-hash", h),
zap.String("remote-peer-id", p.id.String()),
zap.Strings("remote-peer-endpoints", p.eps),
zap.Error(err),
)
}
}
}
@ -129,14 +109,10 @@ func (s *EtcdServer) CheckInitialHashKV() error {
return fmt.Errorf("%s found data inconsistency with peers", s.ID())
}
if lg != nil {
lg.Info(
"initial corruption checking passed; no corruption",
zap.String("local-member-id", s.ID().String()),
)
} else {
plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID())
}
lg.Info(
"initial corruption checking passed; no corruption",
zap.String("local-member-id", s.ID().String()),
)
return nil
}
@ -147,15 +123,11 @@ func (s *EtcdServer) monitorKVHash() {
}
lg := s.getLogger()
if lg != nil {
lg.Info(
"enabled corruption checking",
zap.String("local-member-id", s.ID().String()),
zap.Duration("interval", t),
)
} else {
plog.Infof("enabled corruption checking with %s interval", t)
}
lg.Info(
"enabled corruption checking",
zap.String("local-member-id", s.ID().String()),
zap.Duration("interval", t),
)
for {
select {
@ -167,11 +139,7 @@ func (s *EtcdServer) monitorKVHash() {
continue
}
if err := s.checkHashKV(); err != nil {
if lg != nil {
lg.Warn("failed to check hash KV", zap.Error(err))
} else {
plog.Debugf("check hash kv failed %v", err)
}
lg.Warn("failed to check hash KV", zap.Error(err))
}
}
}
@ -214,19 +182,15 @@ func (s *EtcdServer) checkHashKV() error {
}
if h2 != h && rev2 == rev && crev == crev2 {
if lg != nil {
lg.Warn(
"found hash mismatch",
zap.Int64("revision-1", rev),
zap.Int64("compact-revision-1", crev),
zap.Uint32("hash-1", h),
zap.Int64("revision-2", rev2),
zap.Int64("compact-revision-2", crev2),
zap.Uint32("hash-2", h2),
)
} else {
plog.Warningf("mismatched hashes %d and %d for revision %d", h, h2, rev)
}
lg.Warn(
"found hash mismatch",
zap.Int64("revision-1", rev),
zap.Int64("compact-revision-1", crev),
zap.Uint32("hash-1", h),
zap.Int64("revision-2", rev2),
zap.Int64("compact-revision-2", crev2),
zap.Uint32("hash-2", h2),
)
mismatch(uint64(s.ID()))
}
@ -238,63 +202,36 @@ func (s *EtcdServer) checkHashKV() error {
// leader expects follower's latest revision less than or equal to leader's
if p.resp.Header.Revision > rev2 {
if lg != nil {
lg.Warn(
"revision from follower must be less than or equal to leader's",
zap.Int64("leader-revision", rev2),
zap.Int64("follower-revision", p.resp.Header.Revision),
zap.String("follower-peer-id", types.ID(id).String()),
)
} else {
plog.Warningf(
"revision %d from member %v, expected at most %d",
p.resp.Header.Revision,
types.ID(id),
rev2)
}
lg.Warn(
"revision from follower must be less than or equal to leader's",
zap.Int64("leader-revision", rev2),
zap.Int64("follower-revision", p.resp.Header.Revision),
zap.String("follower-peer-id", types.ID(id).String()),
)
mismatch(id)
}
// leader expects follower's latest compact revision less than or equal to leader's
if p.resp.CompactRevision > crev2 {
if lg != nil {
lg.Warn(
"compact revision from follower must be less than or equal to leader's",
zap.Int64("leader-compact-revision", crev2),
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
zap.String("follower-peer-id", types.ID(id).String()),
)
} else {
plog.Warningf(
"compact revision %d from member %v, expected at most %d",
p.resp.CompactRevision,
types.ID(id),
crev2,
)
}
lg.Warn(
"compact revision from follower must be less than or equal to leader's",
zap.Int64("leader-compact-revision", crev2),
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
zap.String("follower-peer-id", types.ID(id).String()),
)
mismatch(id)
}
// follower's compact revision is leader's old one, then hashes must match
if p.resp.CompactRevision == crev && p.resp.Hash != h {
if lg != nil {
lg.Warn(
"same compact revision then hashes must match",
zap.Int64("leader-compact-revision", crev2),
zap.Uint32("leader-hash", h),
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
zap.Uint32("follower-hash", p.resp.Hash),
zap.String("follower-peer-id", types.ID(id).String()),
)
} else {
plog.Warningf(
"hash %d at revision %d from member %v, expected hash %d",
p.resp.Hash,
rev,
types.ID(id),
h,
)
}
lg.Warn(
"same compact revision then hashes must match",
zap.Int64("leader-compact-revision", crev2),
zap.Uint32("leader-hash", h),
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
zap.Uint32("follower-hash", p.resp.Hash),
zap.String("follower-peer-id", types.ID(id).String()),
)
mismatch(id)
}
}
@ -332,17 +269,13 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) {
Endpoints: p.eps,
})
if cerr != nil {
if lg != nil {
lg.Warn(
"failed to create client to peer URL",
zap.String("local-member-id", s.ID().String()),
zap.String("remote-peer-id", p.id.String()),
zap.Strings("remote-peer-endpoints", p.eps),
zap.Error(cerr),
)
} else {
plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), p.eps, cerr.Error())
}
lg.Warn(
"failed to create client to peer URL",
zap.String("local-member-id", s.ID().String()),
zap.String("remote-peer-id", p.id.String()),
zap.Strings("remote-peer-endpoints", p.eps),
zap.Error(cerr),
)
continue
}
@ -356,17 +289,13 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) {
resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: resp, err: nil})
break
}
if lg != nil {
lg.Warn(
"failed hash kv request",
zap.String("local-member-id", s.ID().String()),
zap.Int64("requested-revision", rev),
zap.String("remote-peer-endpoint", c),
zap.Error(cerr),
)
} else {
plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev)
}
lg.Warn(
"failed hash kv request",
zap.String("local-member-id", s.ID().String()),
zap.Int64("requested-revision", rev),
zap.String("remote-peer-endpoint", c),
zap.Error(cerr),
)
}
cli.Close()

View File

@ -189,28 +189,16 @@ func monitorFileDescriptor(lg *zap.Logger, done <-chan struct{}) {
for {
used, err := runtime.FDUsage()
if err != nil {
if lg != nil {
lg.Warn("failed to get file descriptor usage", zap.Error(err))
} else {
plog.Errorf("cannot monitor file descriptor usage (%v)", err)
}
lg.Warn("failed to get file descriptor usage", zap.Error(err))
return
}
limit, err := runtime.FDLimit()
if err != nil {
if lg != nil {
lg.Warn("failed to get file descriptor limit", zap.Error(err))
} else {
plog.Errorf("cannot monitor file descriptor usage (%v)", err)
}
lg.Warn("failed to get file descriptor limit", zap.Error(err))
return
}
if used >= limit/5*4 {
if lg != nil {
lg.Warn("80% of file descriptors are used", zap.Uint64("used", used), zap.Uint64("limit", limit))
} else {
plog.Warningf("80%% of the file descriptor limit is used [used = %d, limit = %d]", used, limit)
}
lg.Warn("80% of file descriptors are used", zap.Uint64("used", used), zap.Uint64("limit", limit))
}
select {
case <-ticker.C:

View File

@ -78,15 +78,11 @@ func NewBackendQuota(s *EtcdServer, name string) Quota {
if s.Cfg.QuotaBackendBytes < 0 {
// disable quotas if negative
quotaLogOnce.Do(func() {
if lg != nil {
lg.Info(
"disabled backend quota",
zap.String("quota-name", name),
zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
)
} else {
plog.Warningf("disabling backend quota")
}
lg.Info(
"disabled backend quota",
zap.String("quota-name", name),
zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
)
})
return &passthroughQuota{}
}
@ -109,27 +105,21 @@ func NewBackendQuota(s *EtcdServer, name string) Quota {
quotaLogOnce.Do(func() {
if s.Cfg.QuotaBackendBytes > MaxQuotaBytes {
if lg != nil {
lg.Warn(
"quota exceeds the maximum value",
zap.String("quota-name", name),
zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))),
zap.Int64("quota-maximum-size-bytes", MaxQuotaBytes),
zap.String("quota-maximum-size", maxQuotaSize),
)
} else {
plog.Warningf("backend quota %v exceeds maximum recommended quota %v", s.Cfg.QuotaBackendBytes, MaxQuotaBytes)
}
}
if lg != nil {
lg.Info(
"enabled backend quota",
lg.Warn(
"quota exceeds the maximum value",
zap.String("quota-name", name),
zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))),
zap.Int64("quota-maximum-size-bytes", MaxQuotaBytes),
zap.String("quota-maximum-size", maxQuotaSize),
)
}
lg.Info(
"enabled backend quota",
zap.String("quota-name", name),
zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))),
)
})
return &backendQuota{s, s.Cfg.QuotaBackendBytes}
}

View File

@ -198,11 +198,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
case <-time.After(internalTimeout):
if r.lg != nil {
r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
} else {
plog.Warningf("timed out sending read state")
}
r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
case <-r.stopped:
return
}
@ -233,11 +229,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
} else {
plog.Fatalf("raft save state and entries error: %v", err)
}
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
}
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
@ -247,22 +239,14 @@ func (r *raftNode) start(rh *raftReadyHandler) {
if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
} else {
plog.Fatalf("raft save snapshot error: %v", err)
}
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
}
// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}
// gofail: var raftAfterSaveSnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
if r.lg != nil {
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
} else {
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
}
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
// gofail: var raftAfterApplySnap struct{}
}
@ -359,18 +343,13 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
ok, exceed := r.td.Observe(ms[i].To)
if !ok {
// TODO: limit request rate.
if r.lg != nil {
r.lg.Warn(
"leader failed to send out heartbeat on time; took too long, leader is overloaded likely from slow disk",
zap.String("to", fmt.Sprintf("%x", ms[i].To)),
zap.Duration("heartbeat-interval", r.heartbeat),
zap.Duration("expected-duration", 2*r.heartbeat),
zap.Duration("exceeded-duration", exceed),
)
} else {
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v, to %x)", r.heartbeat, exceed, ms[i].To)
plog.Warningf("server is likely overloaded")
}
r.lg.Warn(
"leader failed to send out heartbeat on time; took too long, leader is overloaded likely from slow disk",
zap.String("to", fmt.Sprintf("%x", ms[i].To)),
zap.Duration("heartbeat-interval", r.heartbeat),
zap.Duration("expected-duration", 2*r.heartbeat),
zap.Duration("exceeded-duration", exceed),
)
heartbeatSendFailures.Inc()
}
}
@ -392,11 +371,7 @@ func (r *raftNode) onStop() {
r.ticker.Stop()
r.transport.Stop()
if err := r.storage.Close(); err != nil {
if r.lg != nil {
r.lg.Panic("failed to close Raft storage", zap.Error(err))
} else {
plog.Panicf("raft close storage error: %v", err)
}
r.lg.Panic("failed to close Raft storage", zap.Error(err))
}
close(r.done)
}
@ -432,35 +407,23 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
},
)
if w, err = wal.Create(cfg.Logger, cfg.WALDir(), metadata); err != nil {
if cfg.Logger != nil {
cfg.Logger.Panic("failed to create WAL", zap.Error(err))
} else {
plog.Panicf("create wal error: %v", err)
}
cfg.Logger.Panic("failed to create WAL", zap.Error(err))
}
peers := make([]raft.Peer, len(ids))
for i, id := range ids {
var ctx []byte
ctx, err = json.Marshal((*cl).Member(id))
if err != nil {
if cfg.Logger != nil {
cfg.Logger.Panic("failed to marshal member", zap.Error(err))
} else {
plog.Panicf("marshal member should never fail: %v", err)
}
cfg.Logger.Panic("failed to marshal member", zap.Error(err))
}
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
}
id = member.ID
if cfg.Logger != nil {
cfg.Logger.Info(
"starting local member",
zap.String("local-member-id", id.String()),
zap.String("cluster-id", cl.ID().String()),
)
} else {
plog.Infof("starting member %s in cluster %s", id, cl.ID())
}
cfg.Logger.Info(
"starting local member",
zap.String("local-member-id", id.String()),
zap.String("cluster-id", cl.ID().String()),
)
s = raft.NewMemoryStorage()
c := &raft.Config{
ID: uint64(id),
@ -502,16 +465,12 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member
}
w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap)
if cfg.Logger != nil {
cfg.Logger.Info(
"restarting local member",
zap.String("cluster-id", cid.String()),
zap.String("local-member-id", id.String()),
zap.Uint64("commit-index", st.Commit),
)
} else {
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
}
cfg.Logger.Info(
"restarting local member",
zap.String("cluster-id", cid.String()),
zap.String("local-member-id", id.String()),
zap.Uint64("commit-index", st.Commit),
)
cl := membership.NewCluster(cfg.Logger, "")
cl.SetID(id, cid)
s := raft.NewMemoryStorage()
@ -560,16 +519,12 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
// discard the previously uncommitted entries
for i, ent := range ents {
if ent.Index > st.Commit {
if cfg.Logger != nil {
cfg.Logger.Info(
"discarding uncommitted WAL entries",
zap.Uint64("entry-index", ent.Index),
zap.Uint64("commit-index-from-wal", st.Commit),
zap.Int("number-of-discarded-entries", len(ents)-i),
)
} else {
plog.Infof("discarding %d uncommitted WAL entries ", len(ents)-i)
}
cfg.Logger.Info(
"discarding uncommitted WAL entries",
zap.Uint64("entry-index", ent.Index),
zap.Uint64("commit-index-from-wal", st.Commit),
zap.Int("number-of-discarded-entries", len(ents)-i),
)
ents = ents[:i]
break
}
@ -588,26 +543,18 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
// force commit newly appended entries
err := w.Save(raftpb.HardState{}, toAppEnts)
if err != nil {
if cfg.Logger != nil {
cfg.Logger.Fatal("failed to save hard state and entries", zap.Error(err))
} else {
plog.Fatalf("%v", err)
}
cfg.Logger.Fatal("failed to save hard state and entries", zap.Error(err))
}
if len(ents) != 0 {
st.Commit = ents[len(ents)-1].Index
}
if cfg.Logger != nil {
cfg.Logger.Info(
"forcing restart member",
zap.String("cluster-id", cid.String()),
zap.String("local-member-id", id.String()),
zap.Uint64("commit-index", st.Commit),
)
} else {
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
}
cfg.Logger.Info(
"forcing restart member",
zap.String("cluster-id", cid.String()),
zap.String("local-member-id", id.String()),
zap.Uint64("commit-index", st.Commit),
)
cl := membership.NewCluster(cfg.Logger, "")
cl.SetID(id, cid)
@ -670,11 +617,7 @@ func getIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64
case raftpb.ConfChangeUpdateNode:
// do nothing
default:
if lg != nil {
lg.Panic("unknown ConfChange Type", zap.String("type", cc.Type.String()))
} else {
plog.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
}
lg.Panic("unknown ConfChange Type", zap.String("type", cc.Type.String()))
}
}
sids := make(types.Uint64Slice, 0, len(ids))
@ -710,11 +653,7 @@ func createConfigChangeEnts(lg *zap.Logger, ids []uint64, self uint64, term, ind
}
ctx, err := json.Marshal(m)
if err != nil {
if lg != nil {
lg.Panic("failed to marshal member", zap.Error(err))
} else {
plog.Panicf("marshal member should never fail: %v", err)
}
lg.Panic("failed to marshal member", zap.Error(err))
}
cc := &raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,

File diff suppressed because it is too large Load Diff

View File

@ -29,22 +29,19 @@ import (
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
// as ReadCloser.
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
lg := s.getLogger()
// get a snapshot of v2 store as []byte
clone := s.v2store.Clone()
d, err := clone.SaveNoCopy()
if err != nil {
if lg := s.getLogger(); lg != nil {
lg.Panic("failed to save v2 store data", zap.Error(err))
} else {
plog.Panicf("store save should never fail: %v", err)
}
lg.Panic("failed to save v2 store data", zap.Error(err))
}
// commit kv to write metadata(for example: consistent index).
s.KV().Commit()
dbsnap := s.be.Snapshot()
// get a snapshot of v3 KV as readCloser
rc := newSnapshotReaderCloser(s.getLogger(), dbsnap)
rc := newSnapshotReaderCloser(lg, dbsnap)
// put the []byte snapshot of store into raft snapshot and return the merged snapshot with
// KV readCloser snapshot.
@ -66,34 +63,22 @@ func newSnapshotReaderCloser(lg *zap.Logger, snapshot backend.Snapshot) io.ReadC
go func() {
n, err := snapshot.WriteTo(pw)
if err == nil {
if lg != nil {
lg.Info(
"sent database snapshot to writer",
zap.Int64("bytes", n),
zap.String("size", humanize.Bytes(uint64(n))),
)
} else {
plog.Infof("wrote database snapshot out [total bytes: %d]", n)
}
lg.Info(
"sent database snapshot to writer",
zap.Int64("bytes", n),
zap.String("size", humanize.Bytes(uint64(n))),
)
} else {
if lg != nil {
lg.Warn(
"failed to send database snapshot to writer",
zap.String("size", humanize.Bytes(uint64(n))),
zap.Error(err),
)
} else {
plog.Warningf("failed to write database snapshot out [written bytes: %d]: %v", n, err)
}
lg.Warn(
"failed to send database snapshot to writer",
zap.String("size", humanize.Bytes(uint64(n))),
zap.Error(err),
)
}
pw.CloseWithError(err)
err = snapshot.Close()
if err != nil {
if lg != nil {
lg.Panic("failed to close database snapshot", zap.Error(err))
} else {
plog.Panicf("failed to close database snapshot: %v", err)
}
lg.Panic("failed to close database snapshot", zap.Error(err))
}
}()
return pr

View File

@ -74,34 +74,18 @@ func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id
repaired := false
for {
if w, err = wal.Open(lg, waldir, snap); err != nil {
if lg != nil {
lg.Fatal("failed to open WAL", zap.Error(err))
} else {
plog.Fatalf("open wal error: %v", err)
}
lg.Fatal("failed to open WAL", zap.Error(err))
}
if wmetadata, st, ents, err = w.ReadAll(); err != nil {
w.Close()
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
if lg != nil {
lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
} else {
plog.Fatalf("read wal error (%v) and cannot be repaired", err)
}
lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
}
if !wal.Repair(lg, waldir) {
if lg != nil {
lg.Fatal("failed to repair WAL", zap.Error(err))
} else {
plog.Fatalf("WAL error (%v) cannot be repaired", err)
}
lg.Fatal("failed to repair WAL", zap.Error(err))
} else {
if lg != nil {
lg.Info("repaired WAL", zap.Error(err))
} else {
plog.Infof("repaired WAL error (%v)", err)
}
lg.Info("repaired WAL", zap.Error(err))
repaired = true
}
continue

View File

@ -140,25 +140,15 @@ func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, now time.Time, reqStrin
func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
d := time.Since(now)
if d > warnApplyDuration {
if lg != nil {
lg.Warn(
"apply request took too long",
zap.Duration("took", d),
zap.Duration("expected-duration", warnApplyDuration),
zap.String("prefix", prefix),
zap.String("request", reqStringer.String()),
zap.String("response", resp),
zap.Error(err),
)
} else {
var result string
if err != nil {
result = fmt.Sprintf("error:%v", err)
} else {
result = resp
}
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
}
lg.Warn(
"apply request took too long",
zap.Duration("took", d),
zap.Duration("expected-duration", warnApplyDuration),
zap.String("prefix", prefix),
zap.String("request", reqStringer.String()),
zap.String("response", resp),
zap.Error(err),
)
slowApplies.Inc()
}
}

View File

@ -419,15 +419,11 @@ func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest
checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password)
if err != nil {
if err != auth.ErrAuthNotEnabled {
if lg != nil {
lg.Warn(
"invalid authentication was requested",
zap.String("user", r.Name),
zap.Error(err),
)
} else {
plog.Errorf("invalid authentication request to user %s was issued", r.Name)
}
lg.Warn(
"invalid authentication was requested",
zap.String("user", r.Name),
zap.Error(err),
)
}
return nil, err
}
@ -451,11 +447,7 @@ func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest
break
}
if lg != nil {
lg.Info("revision when password checked became stale; retrying")
} else {
plog.Infof("revision when password checked is obsolete, retrying")
}
lg.Info("revision when password checked became stale; retrying")
}
return resp.(*pb.AuthenticateResponse), nil
@ -707,11 +699,7 @@ func (s *EtcdServer) linearizableReadLoop() {
if err == raft.ErrStopped {
return
}
if lg != nil {
lg.Warn("failed to get read index from Raft", zap.Error(err))
} else {
plog.Errorf("failed to get read index from raft: %v", err)
}
lg.Warn("failed to get read index from Raft", zap.Error(err))
readIndexFailed.Inc()
nr.notify(err)
continue
@ -733,15 +721,11 @@ func (s *EtcdServer) linearizableReadLoop() {
if len(rs.RequestCtx) == 8 {
id2 = binary.BigEndian.Uint64(rs.RequestCtx)
}
if lg != nil {
lg.Warn(
"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
zap.Uint64("sent-request-id", id1),
zap.Uint64("received-request-id", id2),
)
} else {
plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2)
}
lg.Warn(
"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
zap.Uint64("sent-request-id", id1),
zap.Uint64("received-request-id", id2),
)
slowReadIndex.Inc()
}
case <-leaderChangedNotifier:
@ -750,11 +734,7 @@ func (s *EtcdServer) linearizableReadLoop() {
// return a retryable error.
nr.notify(ErrLeaderChanged)
case <-time.After(s.Cfg.ReqTimeout()):
if lg != nil {
lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout()))
} else {
plog.Warningf("timed out waiting for read index response (local node might have slow network)")
}
lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout()))
nr.notify(ErrTimeout)
timeout = true
slowReadIndex.Inc()