diff --git a/etcdserver/membership/cluster.go b/etcdserver/membership/cluster.go index 64f037846..fb19c3e60 100644 --- a/etcdserver/membership/cluster.go +++ b/etcdserver/membership/cluster.go @@ -290,6 +290,8 @@ func (c *RaftCluster) AddMember(m *Member) { } c.members[m.ID] = m + + plog.Infof("added member %s %v to cluster %s", m.ID, m.PeerURLs, c.id) } // RemoveMember removes a member from the store. @@ -306,6 +308,8 @@ func (c *RaftCluster) RemoveMember(id types.ID) { delete(c.members, id) c.removed[id] = true + + plog.Infof("removed member %s from cluster %s", id, c.id) } func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) { @@ -339,6 +343,8 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) if c.be != nil { mustSaveMemberToBackend(c.be, c.members[id]) } + + plog.Noticef("updated member %s %v in cluster %s", id, raftAttr.PeerURLs, c.id) } func (c *RaftCluster) Version() *semver.Version { diff --git a/etcdserver/server.go b/etcdserver/server.go index f93100bb0..ef38d6785 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -614,6 +614,9 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { return } + plog.Infof("applying snapshot at index %d...", ep.snapi) + defer plog.Infof("finished applying incoming snapshot at index %d", ep.snapi) + if apply.snapshot.Metadata.Index <= ep.appliedi { plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1", apply.snapshot.Metadata.Index, ep.appliedi) @@ -630,17 +633,25 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } newbe := backend.NewDefaultBackend(fn) + + plog.Info("restoring mvcc store...") + if err := s.kv.Restore(newbe); err != nil { plog.Panicf("restore KV error: %v", err) } s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex()) + plog.Info("finished restoring mvcc store") + // Closing old backend might block until all the txns // on the backend are finished. // We do not want to wait on closing the old backend. s.bemu.Lock() oldbe := s.be go func() { + plog.Info("closing old backend...") + defer plog.Info("finished closing old backend") + if err := oldbe.Close(); err != nil { plog.Panicf("close backend error: %v", err) } @@ -650,36 +661,51 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { s.bemu.Unlock() if s.lessor != nil { + plog.Info("recovering lessor...") s.lessor.Recover(newbe, s.kv) + plog.Info("finished recovering lessor") } + plog.Info("recovering alarms...") if err := s.restoreAlarms(); err != nil { plog.Panicf("restore alarms error: %v", err) } + plog.Info("finished recovering alarms") if s.authStore != nil { + plog.Info("recovering auth store...") s.authStore.Recover(newbe) + plog.Info("finished recovering auth store") } + plog.Info("recovering store v2...") if err := s.store.Recovery(apply.snapshot.Data); err != nil { plog.Panicf("recovery store error: %v", err) } - s.cluster.SetBackend(s.be) - s.cluster.Recover() + plog.Info("finished recovering store v2") + s.cluster.SetBackend(s.be) + plog.Info("recovering cluster configuration...") + s.cluster.Recover() + plog.Info("finished recovering cluster configuration") + + plog.Info("removing old peers from network...") // recover raft transport s.r.transport.RemoveAllPeers() + plog.Info("finished removing old peers from network") + + plog.Info("adding peers from new cluster configuration into network...") for _, m := range s.cluster.Members() { if m.ID == s.ID() { continue } s.r.transport.AddPeer(m.ID, m.PeerURLs) } + plog.Info("finished adding peers from new cluster configuration into network...") ep.appliedi = apply.snapshot.Metadata.Index ep.snapi = ep.appliedi ep.confState = apply.snapshot.Metadata.ConfState - plog.Infof("recovered from incoming snapshot at index %d", ep.snapi) } func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { @@ -1075,21 +1101,16 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con plog.Panicf("nodeID should always be equal to member ID") } s.cluster.AddMember(m) - if m.ID == s.id { - plog.Noticef("added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) - } else { + if m.ID != s.id { s.r.transport.AddPeer(m.ID, m.PeerURLs) - plog.Noticef("added member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) } case raftpb.ConfChangeRemoveNode: id := types.ID(cc.NodeID) s.cluster.RemoveMember(id) if id == s.id { return true, nil - } else { - s.r.transport.RemovePeer(id) - plog.Noticef("removed member %s from cluster %s", id, s.cluster.ID()) } + s.r.transport.RemovePeer(id) case raftpb.ConfChangeUpdateNode: m := new(membership.Member) if err := json.Unmarshal(cc.Context, m); err != nil { @@ -1099,11 +1120,8 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con plog.Panicf("nodeID should always be equal to member ID") } s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) - if m.ID == s.id { - plog.Noticef("update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) - } else { + if m.ID != s.id { s.r.transport.UpdatePeer(m.ID, m.PeerURLs) - plog.Noticef("update member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) } } return false, nil diff --git a/rafthttp/peer.go b/rafthttp/peer.go index b978f1ad3..683d4f225 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -118,6 +118,9 @@ type peer struct { } func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { + plog.Infof("starting peer %s...", to) + defer plog.Infof("started peer %s", to) + status := newPeerStatus(to) picker := newURLPicker(urls) pipeline := &pipeline{ @@ -270,6 +273,9 @@ func (p *peer) Resume() { } func (p *peer) stop() { + plog.Infof("stopping peer %s...", p.id) + defer plog.Infof("stopped peer %s", p.id) + close(p.stopc) p.cancel() p.msgAppV2Writer.stop() diff --git a/rafthttp/peer_status.go b/rafthttp/peer_status.go index be04b773c..706144f64 100644 --- a/rafthttp/peer_status.go +++ b/rafthttp/peer_status.go @@ -44,7 +44,7 @@ func (s *peerStatus) activate() { s.mu.Lock() defer s.mu.Unlock() if !s.active { - plog.Infof("the connection with %s became active", s.id) + plog.Infof("peer %s became active", s.id) s.active = true s.since = time.Now() } @@ -56,7 +56,7 @@ func (s *peerStatus) deactivate(failure failureType, reason string) { msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason) if s.active { plog.Errorf(msg) - plog.Infof("the connection with %s became inactive", s.id) + plog.Infof("peer %s became inactive", s.id) s.active = false s.since = time.Time{} return diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index 081ca2b7d..58a5c7167 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -64,11 +64,13 @@ func (p *pipeline) start() { for i := 0; i < connPerPipeline; i++ { go p.handle() } + plog.Infof("started HTTP pipelining with peer %s", p.to) } func (p *pipeline) stop() { close(p.stopc) p.wg.Wait() + plog.Infof("stopped HTTP pipelining with peer %s", p.to) } func (p *pipeline) handle() { diff --git a/rafthttp/probing_status.go b/rafthttp/probing_status.go index 80068523f..a0859a4cb 100644 --- a/rafthttp/probing_status.go +++ b/rafthttp/probing_status.go @@ -48,7 +48,7 @@ func monitorProbingStatus(s probing.Status, id string) { select { case <-time.After(statusMonitoringInterval): if !s.Health() { - plog.Warningf("the connection to peer %s is unhealthy", id) + plog.Warningf("health check for peer %s failed", id) } if s.ClockDiff() > time.Second { plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second) diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 5ed4a2fc8..4fe3f9873 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -141,6 +141,8 @@ func (cw *streamWriter) run() { tickc := time.Tick(ConnReadTimeout / 3) unflushed := 0 + plog.Infof("started streaming with peer %s (writer)", cw.id) + for { select { case <-heartbeatc: @@ -155,7 +157,9 @@ func (cw *streamWriter) run() { } cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error()) + cw.close() + plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.id, t) heartbeatc, msgc = nil, nil case m := <-msgc: @@ -177,11 +181,14 @@ func (cw *streamWriter) run() { cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) cw.close() + plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.id, t) heartbeatc, msgc = nil, nil cw.r.ReportUnreachable(m.To) case conn := <-cw.connc: - cw.close() + if cw.close() { + plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.id, t) + } t = conn.t switch conn.t { case streamTypeMsgAppV2: @@ -198,10 +205,14 @@ func (cw *streamWriter) run() { cw.closer = conn.Closer cw.working = true cw.mu.Unlock() + plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.id, t) heartbeatc, msgc = tickc, cw.msgc case <-cw.stopc: - cw.close() + if cw.close() { + plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.id, t) + } close(cw.done) + plog.Infof("stopped streaming with peer %s (writer)", cw.id) return } } @@ -213,11 +224,11 @@ func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) { return cw.msgc, cw.working } -func (cw *streamWriter) close() { +func (cw *streamWriter) close() bool { cw.mu.Lock() defer cw.mu.Unlock() if !cw.working { - return + return false } cw.closer.Close() if len(cw.msgc) > 0 { @@ -225,6 +236,7 @@ func (cw *streamWriter) close() { } cw.msgc = make(chan raftpb.Message, streamBufSize) cw.working = false + return true } func (cw *streamWriter) attach(conn *outgoingConn) bool { @@ -275,8 +287,9 @@ func (r *streamReader) start() { } func (cr *streamReader) run() { + t := cr.typ + plog.Infof("started streaming with peer %s (%s reader)", cr.to, t) for { - t := cr.typ rc, err := cr.dial(t) if err != nil { if err != errUnsupportedStreamType { @@ -284,7 +297,9 @@ func (cr *streamReader) run() { } } else { cr.status.activate() + plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.to, cr.typ) err := cr.decodeLoop(rc, t) + plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.to, cr.typ) switch { // all data is read out case err == io.EOF: @@ -300,6 +315,7 @@ func (cr *streamReader) run() { case <-time.After(100 * time.Millisecond): case <-cr.stopc: close(cr.done) + plog.Infof("stopped streaming with peer %s (%s reader)", cr.to, t) return } } diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 9f1e15894..ab459b0f3 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -231,6 +231,7 @@ func (t *Transport) AddRemote(id types.ID, us []string) { func (t *Transport) AddPeer(id types.ID, us []string) { t.mu.Lock() defer t.mu.Unlock() + if t.peers == nil { panic("transport stopped") } @@ -244,6 +245,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) { fs := t.LeaderStats.Follower(id.String()) t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC) addPeerToProber(t.prober, id.String(), us) + + plog.Infof("added peer %s", id) } func (t *Transport) RemovePeer(id types.ID) { @@ -270,6 +273,7 @@ func (t *Transport) removePeer(id types.ID) { delete(t.peers, id) delete(t.LeaderStats.Followers, id.String()) t.prober.Remove(id.String()) + plog.Infof("removed peer %s", id) } func (t *Transport) UpdatePeer(id types.ID, us []string) { @@ -287,6 +291,7 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) { t.prober.Remove(id.String()) addPeerToProber(t.prober, id.String(), us) + plog.Infof("updated peer %s", id) } func (t *Transport) ActiveSince(id types.ID) time.Time {