mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #12775 from ptabor/20210314-zip
etcd-raft-zap logger fixes.
This commit is contained in:
commit
e599f4a482
@ -136,9 +136,9 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
|
|||||||
defer func(start time.Time) {
|
defer func(start time.Time) {
|
||||||
success := ar.err == nil || ar.err == mvcc.ErrCompacted
|
success := ar.err == nil || ar.err == mvcc.ErrCompacted
|
||||||
applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
|
applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
|
||||||
warnOfExpensiveRequest(a.s.getLogger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
|
warnOfExpensiveRequest(a.s.Logger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
|
||||||
if !success {
|
if !success {
|
||||||
warnOfFailedRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
|
warnOfFailedRequest(a.s.Logger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
|
||||||
}
|
}
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
@ -243,7 +243,7 @@ func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.Put
|
|||||||
// create put tracing if the trace in context is empty
|
// create put tracing if the trace in context is empty
|
||||||
if trace.IsEmpty() {
|
if trace.IsEmpty() {
|
||||||
trace = traceutil.New("put",
|
trace = traceutil.New("put",
|
||||||
a.s.getLogger(),
|
a.s.Logger(),
|
||||||
traceutil.Field{Key: "key", Value: string(p.Key)},
|
traceutil.Field{Key: "key", Value: string(p.Key)},
|
||||||
traceutil.Field{Key: "req_size", Value: proto.Size(p)},
|
traceutil.Field{Key: "req_size", Value: proto.Size(p)},
|
||||||
)
|
)
|
||||||
@ -420,7 +420,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra
|
|||||||
func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
|
func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
|
||||||
trace := traceutil.Get(ctx)
|
trace := traceutil.Get(ctx)
|
||||||
if trace.IsEmpty() {
|
if trace.IsEmpty() {
|
||||||
trace = traceutil.New("transaction", a.s.getLogger())
|
trace = traceutil.New("transaction", a.s.Logger())
|
||||||
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
|
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
|
||||||
}
|
}
|
||||||
isWrite := !isTxnReadonly(rt)
|
isWrite := !isTxnReadonly(rt)
|
||||||
@ -606,7 +606,7 @@ func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *
|
|||||||
reqs = rt.Failure
|
reqs = rt.Failure
|
||||||
}
|
}
|
||||||
|
|
||||||
lg := a.s.getLogger()
|
lg := a.s.Logger()
|
||||||
for i, req := range reqs {
|
for i, req := range reqs {
|
||||||
respi := tresp.Responses[i].Response
|
respi := tresp.Responses[i].Response
|
||||||
switch tv := req.Request.(type) {
|
switch tv := req.Request.(type) {
|
||||||
@ -654,7 +654,7 @@ func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.Com
|
|||||||
resp := &pb.CompactionResponse{}
|
resp := &pb.CompactionResponse{}
|
||||||
resp.Header = &pb.ResponseHeader{}
|
resp.Header = &pb.ResponseHeader{}
|
||||||
trace := traceutil.New("compact",
|
trace := traceutil.New("compact",
|
||||||
a.s.getLogger(),
|
a.s.Logger(),
|
||||||
traceutil.Field{Key: "revision", Value: compaction.Revision},
|
traceutil.Field{Key: "revision", Value: compaction.Revision},
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -698,7 +698,7 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
|
|||||||
resp := &pb.AlarmResponse{}
|
resp := &pb.AlarmResponse{}
|
||||||
oldCount := len(a.s.alarmStore.Get(ar.Alarm))
|
oldCount := len(a.s.alarmStore.Get(ar.Alarm))
|
||||||
|
|
||||||
lg := a.s.getLogger()
|
lg := a.s.Logger()
|
||||||
switch ar.Action {
|
switch ar.Action {
|
||||||
case pb.AlarmRequest_GET:
|
case pb.AlarmRequest_GET:
|
||||||
resp.Alarms = a.s.alarmStore.Get(ar.Alarm)
|
resp.Alarms = a.s.alarmStore.Get(ar.Alarm)
|
||||||
|
@ -120,7 +120,7 @@ func (s *EtcdServer) applyV2Request(r *RequestV2) (resp Response) {
|
|||||||
defer func(start time.Time) {
|
defer func(start time.Time) {
|
||||||
success := resp.Err == nil
|
success := resp.Err == nil
|
||||||
applySec.WithLabelValues(v2Version, r.Method, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
|
applySec.WithLabelValues(v2Version, r.Method, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
|
||||||
warnOfExpensiveRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil)
|
warnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil)
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
switch r.Method {
|
switch r.Method {
|
||||||
|
@ -41,7 +41,7 @@ func (s *EtcdServer) CheckInitialHashKV() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
|
|
||||||
lg.Info(
|
lg.Info(
|
||||||
"starting initial corruption check",
|
"starting initial corruption check",
|
||||||
@ -126,7 +126,7 @@ func (s *EtcdServer) monitorKVHash() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
lg.Info(
|
lg.Info(
|
||||||
"enabled corruption checking",
|
"enabled corruption checking",
|
||||||
zap.String("local-member-id", s.ID().String()),
|
zap.String("local-member-id", s.ID().String()),
|
||||||
@ -149,7 +149,7 @@ func (s *EtcdServer) monitorKVHash() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) checkHashKV() error {
|
func (s *EtcdServer) checkHashKV() error {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
|
|
||||||
h, rev, crev, err := s.kv.HashByRev(0)
|
h, rev, crev, err := s.kv.HashByRev(0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -268,7 +268,7 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp {
|
|||||||
peers = append(peers, peerInfo{id: m.ID, eps: m.PeerURLs})
|
peers = append(peers, peerInfo{id: m.ID, eps: m.PeerURLs})
|
||||||
}
|
}
|
||||||
|
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
|
|
||||||
var resps []*peerHashKVResp
|
var resps []*peerHashKVResp
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
@ -345,7 +345,7 @@ type hashKVHandler struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) HashKVHandler() http.Handler {
|
func (s *EtcdServer) HashKVHandler() http.Handler {
|
||||||
return &hashKVHandler{lg: s.getLogger(), server: s}
|
return &hashKVHandler{lg: s.Logger(), server: s}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -72,7 +72,7 @@ var (
|
|||||||
|
|
||||||
// NewBackendQuota creates a quota layer with the given storage limit.
|
// NewBackendQuota creates a quota layer with the given storage limit.
|
||||||
func NewBackendQuota(s *EtcdServer, name string) Quota {
|
func NewBackendQuota(s *EtcdServer, name string) Quota {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
quotaBackendBytes.Set(float64(s.Cfg.QuotaBackendBytes))
|
quotaBackendBytes.Set(float64(s.Cfg.QuotaBackendBytes))
|
||||||
|
|
||||||
if s.Cfg.QuotaBackendBytes < 0 {
|
if s.Cfg.QuotaBackendBytes < 0 {
|
||||||
|
@ -461,17 +461,7 @@ func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.
|
|||||||
CheckQuorum: true,
|
CheckQuorum: true,
|
||||||
PreVote: cfg.PreVote,
|
PreVote: cfg.PreVote,
|
||||||
}
|
}
|
||||||
if cfg.Logger != nil {
|
c.Logger, err = getRaftLogger(cfg)
|
||||||
// called after capnslog setting in "init" function
|
|
||||||
if cfg.LoggerConfig != nil {
|
|
||||||
c.Logger, err = NewRaftLogger(cfg.LoggerConfig)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("cannot create raft logger %v", err)
|
|
||||||
}
|
|
||||||
} else if cfg.LoggerCore != nil && cfg.LoggerWriteSyncer != nil {
|
|
||||||
c.Logger = NewRaftLoggerFromZapCore(cfg.LoggerCore, cfg.LoggerWriteSyncer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(peers) == 0 {
|
if len(peers) == 0 {
|
||||||
n = raft.RestartNode(c)
|
n = raft.RestartNode(c)
|
||||||
@ -515,18 +505,11 @@ func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID,
|
|||||||
CheckQuorum: true,
|
CheckQuorum: true,
|
||||||
PreVote: cfg.PreVote,
|
PreVote: cfg.PreVote,
|
||||||
}
|
}
|
||||||
if cfg.Logger != nil {
|
|
||||||
// called after capnslog setting in "init" function
|
|
||||||
var err error
|
var err error
|
||||||
if cfg.LoggerConfig != nil {
|
c.Logger, err = getRaftLogger(cfg)
|
||||||
c.Logger, err = NewRaftLogger(cfg.LoggerConfig)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("cannot create raft logger %v", err)
|
log.Fatalf("cannot create raft logger %v", err)
|
||||||
}
|
}
|
||||||
} else if cfg.LoggerCore != nil && cfg.LoggerWriteSyncer != nil {
|
|
||||||
c.Logger = NewRaftLoggerFromZapCore(cfg.LoggerCore, cfg.LoggerWriteSyncer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
n := raft.RestartNode(c)
|
n := raft.RestartNode(c)
|
||||||
raftStatusMu.Lock()
|
raftStatusMu.Lock()
|
||||||
@ -600,23 +583,31 @@ func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot)
|
|||||||
CheckQuorum: true,
|
CheckQuorum: true,
|
||||||
PreVote: cfg.PreVote,
|
PreVote: cfg.PreVote,
|
||||||
}
|
}
|
||||||
if cfg.Logger != nil {
|
|
||||||
// called after capnslog setting in "init" function
|
c.Logger, err = getRaftLogger(cfg)
|
||||||
if cfg.LoggerConfig != nil {
|
|
||||||
c.Logger, err = NewRaftLogger(cfg.LoggerConfig)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("cannot create raft logger %v", err)
|
log.Fatalf("cannot create raft logger %v", err)
|
||||||
}
|
}
|
||||||
} else if cfg.LoggerCore != nil && cfg.LoggerWriteSyncer != nil {
|
|
||||||
c.Logger = NewRaftLoggerFromZapCore(cfg.LoggerCore, cfg.LoggerWriteSyncer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
n := raft.RestartNode(c)
|
n := raft.RestartNode(c)
|
||||||
raftStatus = n.Status
|
raftStatus = n.Status
|
||||||
return id, cl, n, s, w
|
return id, cl, n, s, w
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getRaftLogger(cfg config.ServerConfig) (raft.Logger, error) {
|
||||||
|
if cfg.Logger != nil {
|
||||||
|
// called after capnslog setting in "init" function
|
||||||
|
if cfg.LoggerConfig != nil {
|
||||||
|
return NewRaftLogger(cfg.LoggerConfig)
|
||||||
|
} else if cfg.LoggerCore != nil && cfg.LoggerWriteSyncer != nil {
|
||||||
|
return NewRaftLoggerFromZapCore(cfg.LoggerCore, cfg.LoggerWriteSyncer), nil
|
||||||
|
} else {
|
||||||
|
return NewRaftLoggerZap(cfg.Logger), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// getIDs returns an ordered set of IDs included in the given snapshot and
|
// getIDs returns an ordered set of IDs included in the given snapshot and
|
||||||
// the entries. The given snapshot/entries can contain three kinds of
|
// the entries. The given snapshot/entries can contain three kinds of
|
||||||
// ID-related entry:
|
// ID-related entry:
|
||||||
|
@ -539,7 +539,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||||
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||||
srv.lessor = lease.NewLessor(
|
srv.lessor = lease.NewLessor(
|
||||||
srv.getLogger(),
|
srv.Logger(),
|
||||||
srv.be,
|
srv.be,
|
||||||
lease.LessorConfig{
|
lease.LessorConfig{
|
||||||
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
|
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
|
||||||
@ -559,7 +559,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
|
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
|
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
|
||||||
kvindex := srv.consistIndex.ConsistentIndex()
|
kvindex := srv.consistIndex.ConsistentIndex()
|
||||||
srv.lg.Debug("restore consistentIndex",
|
srv.lg.Debug("restore consistentIndex",
|
||||||
zap.Uint64("index", kvindex))
|
zap.Uint64("index", kvindex))
|
||||||
@ -577,7 +577,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, srv.consistIndex, tp, int(cfg.BcryptCost))
|
srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, srv.consistIndex, tp, int(cfg.BcryptCost))
|
||||||
|
|
||||||
newSrv := srv // since srv == nil in defer if srv is returned as nil
|
newSrv := srv // since srv == nil in defer if srv is returned as nil
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -641,7 +641,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
return srv, nil
|
return srv, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) getLogger() *zap.Logger {
|
func (s *EtcdServer) Logger() *zap.Logger {
|
||||||
s.lgMu.RLock()
|
s.lgMu.RLock()
|
||||||
l := s.lg
|
l := s.lg
|
||||||
s.lgMu.RUnlock()
|
s.lgMu.RUnlock()
|
||||||
@ -653,7 +653,7 @@ func tickToDur(ticks int, tickMs uint) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) adjustTicks() {
|
func (s *EtcdServer) adjustTicks() {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
clusterN := len(s.cluster.Members())
|
clusterN := len(s.cluster.Members())
|
||||||
|
|
||||||
// single-node fresh start, or single-node recovers from snapshot
|
// single-node fresh start, or single-node recovers from snapshot
|
||||||
@ -723,7 +723,7 @@ func (s *EtcdServer) Start() {
|
|||||||
s.GoAttach(func() { s.adjustTicks() })
|
s.GoAttach(func() { s.adjustTicks() })
|
||||||
s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
|
s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
|
||||||
s.GoAttach(s.purgeFile)
|
s.GoAttach(s.purgeFile)
|
||||||
s.GoAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) })
|
s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
|
||||||
s.GoAttach(s.monitorVersions)
|
s.GoAttach(s.monitorVersions)
|
||||||
s.GoAttach(s.linearizableReadLoop)
|
s.GoAttach(s.linearizableReadLoop)
|
||||||
s.GoAttach(s.monitorKVHash)
|
s.GoAttach(s.monitorKVHash)
|
||||||
@ -734,7 +734,7 @@ func (s *EtcdServer) Start() {
|
|||||||
// modify a server's fields after it has been sent to Start.
|
// modify a server's fields after it has been sent to Start.
|
||||||
// This function is just used for testing.
|
// This function is just used for testing.
|
||||||
func (s *EtcdServer) start() {
|
func (s *EtcdServer) start() {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
|
|
||||||
if s.Cfg.SnapshotCount == 0 {
|
if s.Cfg.SnapshotCount == 0 {
|
||||||
lg.Info(
|
lg.Info(
|
||||||
@ -786,7 +786,7 @@ func (s *EtcdServer) start() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) purgeFile() {
|
func (s *EtcdServer) purgeFile() {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
var dberrc, serrc, werrc <-chan error
|
var dberrc, serrc, werrc <-chan error
|
||||||
var dbdonec, sdonec, wdonec <-chan struct{}
|
var dbdonec, sdonec, wdonec <-chan struct{}
|
||||||
if s.Cfg.MaxSnapFiles > 0 {
|
if s.Cfg.MaxSnapFiles > 0 {
|
||||||
@ -853,7 +853,7 @@ type downgradeEnabledHandler struct {
|
|||||||
|
|
||||||
func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
|
func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
|
||||||
return &downgradeEnabledHandler{
|
return &downgradeEnabledHandler{
|
||||||
lg: s.getLogger(),
|
lg: s.Logger(),
|
||||||
cluster: s.cluster,
|
cluster: s.cluster,
|
||||||
server: s,
|
server: s,
|
||||||
}
|
}
|
||||||
@ -890,7 +890,7 @@ func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Reque
|
|||||||
// Process takes a raft message and applies it to the server's raft state
|
// Process takes a raft message and applies it to the server's raft state
|
||||||
// machine, respecting any timeout of the given context.
|
// machine, respecting any timeout of the given context.
|
||||||
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
|
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
if s.cluster.IsIDRemoved(types.ID(m.From)) {
|
if s.cluster.IsIDRemoved(types.ID(m.From)) {
|
||||||
lg.Warn(
|
lg.Warn(
|
||||||
"rejected Raft message from removed member",
|
"rejected Raft message from removed member",
|
||||||
@ -933,7 +933,7 @@ type raftReadyHandler struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) run() {
|
func (s *EtcdServer) run() {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
|
|
||||||
sn, err := s.r.raftStorage.Snapshot()
|
sn, err := s.r.raftStorage.Snapshot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1128,7 +1128,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
|||||||
}
|
}
|
||||||
applySnapshotInProgress.Inc()
|
applySnapshotInProgress.Inc()
|
||||||
|
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
lg.Info(
|
lg.Info(
|
||||||
"applying snapshot",
|
"applying snapshot",
|
||||||
zap.Uint64("current-snapshot-index", ep.snapi),
|
zap.Uint64("current-snapshot-index", ep.snapi),
|
||||||
@ -1261,7 +1261,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
|
|||||||
}
|
}
|
||||||
firsti := apply.entries[0].Index
|
firsti := apply.entries[0].Index
|
||||||
if firsti > ep.appliedi+1 {
|
if firsti > ep.appliedi+1 {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
lg.Panic(
|
lg.Panic(
|
||||||
"unexpected committed entry index",
|
"unexpected committed entry index",
|
||||||
zap.Uint64("current-applied-index", ep.appliedi),
|
zap.Uint64("current-applied-index", ep.appliedi),
|
||||||
@ -1286,7 +1286,7 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
lg.Info(
|
lg.Info(
|
||||||
"triggering snapshot",
|
"triggering snapshot",
|
||||||
zap.String("local-member-id", s.ID().String()),
|
zap.String("local-member-id", s.ID().String()),
|
||||||
@ -1316,7 +1316,7 @@ func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) er
|
|||||||
now := time.Now()
|
now := time.Now()
|
||||||
interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
|
interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
|
||||||
|
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
lg.Info(
|
lg.Info(
|
||||||
"leadership transfer starting",
|
"leadership transfer starting",
|
||||||
zap.String("local-member-id", s.ID().String()),
|
zap.String("local-member-id", s.ID().String()),
|
||||||
@ -1346,7 +1346,7 @@ func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) er
|
|||||||
|
|
||||||
// TransferLeadership transfers the leader to the chosen transferee.
|
// TransferLeadership transfers the leader to the chosen transferee.
|
||||||
func (s *EtcdServer) TransferLeadership() error {
|
func (s *EtcdServer) TransferLeadership() error {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
if !s.isLeader() {
|
if !s.isLeader() {
|
||||||
lg.Info(
|
lg.Info(
|
||||||
"skipped leadership transfer; local server is not leader",
|
"skipped leadership transfer; local server is not leader",
|
||||||
@ -1394,7 +1394,7 @@ func (s *EtcdServer) HardStop() {
|
|||||||
// Stop terminates the Server and performs any necessary finalization.
|
// Stop terminates the Server and performs any necessary finalization.
|
||||||
// Do and Process cannot be called after Stop has been invoked.
|
// Do and Process cannot be called after Stop has been invoked.
|
||||||
func (s *EtcdServer) Stop() {
|
func (s *EtcdServer) Stop() {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
if err := s.TransferLeadership(); err != nil {
|
if err := s.TransferLeadership(); err != nil {
|
||||||
lg.Warn("leadership transfer failed", zap.String("local-member-id", s.ID().String()), zap.Error(err))
|
lg.Warn("leadership transfer failed", zap.String("local-member-id", s.ID().String()), zap.Error(err))
|
||||||
}
|
}
|
||||||
@ -1487,7 +1487,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) mayAddMember(memb membership.Member) error {
|
func (s *EtcdServer) mayAddMember(memb membership.Member) error {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
if !s.Cfg.StrictReconfigCheck {
|
if !s.Cfg.StrictReconfigCheck {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -1613,7 +1613,7 @@ func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membershi
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) mayPromoteMember(id types.ID) error {
|
func (s *EtcdServer) mayPromoteMember(id types.ID) error {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
err := s.isLearnerReady(uint64(id))
|
err := s.isLearnerReady(uint64(id))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -1674,7 +1674,7 @@ func (s *EtcdServer) mayRemoveMember(id types.ID) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
isLearner := s.cluster.IsMemberExist(id) && s.cluster.Member(id).IsLearner
|
isLearner := s.cluster.IsMemberExist(id) && s.cluster.Member(id).IsLearner
|
||||||
// no need to check quorum when removing non-voting member
|
// no need to check quorum when removing non-voting member
|
||||||
if isLearner {
|
if isLearner {
|
||||||
@ -1798,7 +1798,7 @@ type confChangeResponse struct {
|
|||||||
// then waits for it to be applied to the server. It
|
// then waits for it to be applied to the server. It
|
||||||
// will block until the change is performed or there is an error.
|
// will block until the change is performed or there is an error.
|
||||||
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) {
|
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
cc.ID = s.reqIDGen.Next()
|
cc.ID = s.reqIDGen.Next()
|
||||||
ch := s.w.Register(cc.ID)
|
ch := s.w.Register(cc.ID)
|
||||||
|
|
||||||
@ -1864,7 +1864,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
|
|||||||
ClientUrls: s.attributes.ClientURLs,
|
ClientUrls: s.attributes.ClientURLs,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-s.stopping:
|
case <-s.stopping:
|
||||||
@ -1918,7 +1918,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
|
|||||||
// process publish requests through rafthttp
|
// process publish requests through rafthttp
|
||||||
// TODO: Deprecate v2 store in 3.6
|
// TODO: Deprecate v2 store in 3.6
|
||||||
func (s *EtcdServer) publish(timeout time.Duration) {
|
func (s *EtcdServer) publish(timeout time.Duration) {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
b, err := json.Marshal(s.attributes)
|
b, err := json.Marshal(s.attributes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lg.Panic("failed to marshal JSON", zap.Error(err))
|
lg.Panic("failed to marshal JSON", zap.Error(err))
|
||||||
@ -1973,7 +1973,7 @@ func (s *EtcdServer) publish(timeout time.Duration) {
|
|||||||
func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
|
func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
|
||||||
atomic.AddInt64(&s.inflightSnapshots, 1)
|
atomic.AddInt64(&s.inflightSnapshots, 1)
|
||||||
|
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
fields := []zap.Field{
|
fields := []zap.Field{
|
||||||
zap.String("from", s.ID().String()),
|
zap.String("from", s.ID().String()),
|
||||||
zap.String("to", types.ID(merged.To).String()),
|
zap.String("to", types.ID(merged.To).String()),
|
||||||
@ -2039,7 +2039,7 @@ func (s *EtcdServer) apply(
|
|||||||
s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
|
s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
|
||||||
|
|
||||||
default:
|
default:
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
lg.Panic(
|
lg.Panic(
|
||||||
"unknown entry type; must be either EntryNormal or EntryConfChange",
|
"unknown entry type; must be either EntryNormal or EntryConfChange",
|
||||||
zap.String("type", e.Type.String()),
|
zap.String("type", e.Type.String()),
|
||||||
@ -2120,7 +2120,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
lg.Warn(
|
lg.Warn(
|
||||||
"message exceeded backend quota; raising alarm",
|
"message exceeded backend quota; raising alarm",
|
||||||
zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
|
zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
|
||||||
@ -2148,7 +2148,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
*confState = *s.r.ApplyConfChange(cc)
|
*confState = *s.r.ApplyConfChange(cc)
|
||||||
switch cc.Type {
|
switch cc.Type {
|
||||||
case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
|
case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
|
||||||
@ -2222,7 +2222,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
|||||||
s.KV().Commit()
|
s.KV().Commit()
|
||||||
|
|
||||||
s.GoAttach(func() {
|
s.GoAttach(func() {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
|
|
||||||
d, err := clone.SaveNoCopy()
|
d, err := clone.SaveNoCopy()
|
||||||
// TODO: current store will never fail to do a snapshot
|
// TODO: current store will never fail to do a snapshot
|
||||||
@ -2328,7 +2328,7 @@ func (s *EtcdServer) monitorVersions() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
v := decideClusterVersion(s.getLogger(), getVersions(s.getLogger(), s.cluster, s.id, s.peerRt))
|
v := decideClusterVersion(s.Logger(), getVersions(s.Logger(), s.cluster, s.id, s.peerRt))
|
||||||
if v != nil {
|
if v != nil {
|
||||||
// only keep major.minor version for comparison
|
// only keep major.minor version for comparison
|
||||||
v = &semver.Version{
|
v = &semver.Version{
|
||||||
@ -2356,7 +2356,7 @@ func (s *EtcdServer) monitorVersions() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) updateClusterVersion(ver string) {
|
func (s *EtcdServer) updateClusterVersion(ver string) {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
|
|
||||||
if s.cluster.Version() == nil {
|
if s.cluster.Version() == nil {
|
||||||
lg.Info(
|
lg.Info(
|
||||||
@ -2396,7 +2396,7 @@ func (s *EtcdServer) monitorDowngrade() {
|
|||||||
if t == 0 {
|
if t == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-time.After(t):
|
case <-time.After(t):
|
||||||
@ -2415,7 +2415,7 @@ func (s *EtcdServer) monitorDowngrade() {
|
|||||||
|
|
||||||
targetVersion := d.TargetVersion
|
targetVersion := d.TargetVersion
|
||||||
v := semver.Must(semver.NewVersion(targetVersion))
|
v := semver.Must(semver.NewVersion(targetVersion))
|
||||||
if isMatchedVersions(s.getLogger(), v, getVersions(s.getLogger(), s.cluster, s.id, s.peerRt)) {
|
if isMatchedVersions(s.Logger(), v, getVersions(s.Logger(), s.cluster, s.id, s.peerRt)) {
|
||||||
lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))
|
lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
||||||
if _, err := s.downgradeCancel(ctx); err != nil {
|
if _, err := s.downgradeCancel(ctx); err != nil {
|
||||||
@ -2492,7 +2492,7 @@ func (s *EtcdServer) GoAttach(f func()) {
|
|||||||
defer s.wgMu.RUnlock()
|
defer s.wgMu.RUnlock()
|
||||||
select {
|
select {
|
||||||
case <-s.stopping:
|
case <-s.stopping:
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
lg.Warn("server has stopped; skipping GoAttach")
|
lg.Warn("server has stopped; skipping GoAttach")
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
@ -2510,10 +2510,6 @@ func (s *EtcdServer) Alarms() []*pb.AlarmMember {
|
|||||||
return s.alarmStore.Get(pb.AlarmType_NONE)
|
return s.alarmStore.Get(pb.AlarmType_NONE)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) Logger() *zap.Logger {
|
|
||||||
return s.lg
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsLearner returns if the local member is raft learner
|
// IsLearner returns if the local member is raft learner
|
||||||
func (s *EtcdServer) IsLearner() bool {
|
func (s *EtcdServer) IsLearner() bool {
|
||||||
return s.cluster.IsLocalMemberLearner()
|
return s.cluster.IsLocalMemberLearner()
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
|
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
|
||||||
// as ReadCloser.
|
// as ReadCloser.
|
||||||
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
|
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
// get a snapshot of v2 store as []byte
|
// get a snapshot of v2 store as []byte
|
||||||
clone := s.v2store.Clone()
|
clone := s.v2store.Clone()
|
||||||
d, err := clone.SaveNoCopy()
|
d, err := clone.SaveNoCopy()
|
||||||
|
@ -93,7 +93,7 @@ type Authenticator interface {
|
|||||||
|
|
||||||
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||||
trace := traceutil.New("range",
|
trace := traceutil.New("range",
|
||||||
s.getLogger(),
|
s.Logger(),
|
||||||
traceutil.Field{Key: "range_begin", Value: string(r.Key)},
|
traceutil.Field{Key: "range_begin", Value: string(r.Key)},
|
||||||
traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)},
|
traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)},
|
||||||
)
|
)
|
||||||
@ -102,7 +102,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
|
|||||||
var resp *pb.RangeResponse
|
var resp *pb.RangeResponse
|
||||||
var err error
|
var err error
|
||||||
defer func(start time.Time) {
|
defer func(start time.Time) {
|
||||||
warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
|
warnOfExpensiveReadOnlyRangeRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
trace.AddField(
|
trace.AddField(
|
||||||
traceutil.Field{Key: "response_count", Value: len(resp.Kvs)},
|
traceutil.Field{Key: "response_count", Value: len(resp.Kvs)},
|
||||||
@ -151,7 +151,7 @@ func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest)
|
|||||||
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||||
if isTxnReadonly(r) {
|
if isTxnReadonly(r) {
|
||||||
trace := traceutil.New("transaction",
|
trace := traceutil.New("transaction",
|
||||||
s.getLogger(),
|
s.Logger(),
|
||||||
traceutil.Field{Key: "read_only", Value: true},
|
traceutil.Field{Key: "read_only", Value: true},
|
||||||
)
|
)
|
||||||
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
|
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
|
||||||
@ -169,7 +169,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
|
|||||||
}
|
}
|
||||||
|
|
||||||
defer func(start time.Time) {
|
defer func(start time.Time) {
|
||||||
warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
|
warnOfExpensiveReadOnlyTxnRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
|
||||||
trace.LogIfLong(traceThreshold)
|
trace.LogIfLong(traceThreshold)
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
@ -426,7 +426,7 @@ func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
|
|
||||||
var resp proto.Message
|
var resp proto.Message
|
||||||
for {
|
for {
|
||||||
@ -723,7 +723,7 @@ func (s *EtcdServer) linearizableReadLoop() {
|
|||||||
|
|
||||||
// as a single loop is can unlock multiple reads, it is not very useful
|
// as a single loop is can unlock multiple reads, it is not very useful
|
||||||
// to propagate the trace from Txn or Range.
|
// to propagate the trace from Txn or Range.
|
||||||
trace := traceutil.New("linearizableReadLoop", s.getLogger())
|
trace := traceutil.New("linearizableReadLoop", s.Logger())
|
||||||
nextnr := newNotifier()
|
nextnr := newNotifier()
|
||||||
|
|
||||||
s.readMu.Lock()
|
s.readMu.Lock()
|
||||||
@ -731,7 +731,7 @@ func (s *EtcdServer) linearizableReadLoop() {
|
|||||||
s.readNotifier = nextnr
|
s.readNotifier = nextnr
|
||||||
s.readMu.Unlock()
|
s.readMu.Unlock()
|
||||||
|
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
||||||
if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
|
if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
@ -895,7 +895,7 @@ func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.Downg
|
|||||||
func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
|
func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
|
||||||
// validate downgrade capability before starting downgrade
|
// validate downgrade capability before starting downgrade
|
||||||
v := r.Version
|
v := r.Version
|
||||||
lg := s.getLogger()
|
lg := s.Logger()
|
||||||
if resp, err := s.downgradeValidate(ctx, v); err != nil {
|
if resp, err := s.downgradeValidate(ctx, v); err != nil {
|
||||||
lg.Warn("reject downgrade request", zap.Error(err))
|
lg.Warn("reject downgrade request", zap.Error(err))
|
||||||
return resp, err
|
return resp, err
|
||||||
|
Loading…
x
Reference in New Issue
Block a user