*: more logging on critical state change

Add more logging for better debugging purpose.
This commit is contained in:
Xiang Li 2016-05-31 21:12:41 -07:00
parent fc06dd1452
commit 8528c8c599
8 changed files with 75 additions and 22 deletions

View File

@ -290,6 +290,8 @@ func (c *RaftCluster) AddMember(m *Member) {
} }
c.members[m.ID] = m c.members[m.ID] = m
plog.Infof("added member %s %v to cluster %s", m.ID, m.PeerURLs, c.id)
} }
// RemoveMember removes a member from the store. // RemoveMember removes a member from the store.
@ -306,6 +308,8 @@ func (c *RaftCluster) RemoveMember(id types.ID) {
delete(c.members, id) delete(c.members, id)
c.removed[id] = true c.removed[id] = true
plog.Infof("removed member %s from cluster %s", id, c.id)
} }
func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) { func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) {
@ -339,6 +343,8 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes)
if c.be != nil { if c.be != nil {
mustSaveMemberToBackend(c.be, c.members[id]) mustSaveMemberToBackend(c.be, c.members[id])
} }
plog.Noticef("updated member %s %v in cluster %s", id, raftAttr.PeerURLs, c.id)
} }
func (c *RaftCluster) Version() *semver.Version { func (c *RaftCluster) Version() *semver.Version {

View File

@ -614,6 +614,9 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
return return
} }
plog.Infof("applying snapshot at index %d...", ep.snapi)
defer plog.Infof("finished applying incoming snapshot at index %d", ep.snapi)
if apply.snapshot.Metadata.Index <= ep.appliedi { if apply.snapshot.Metadata.Index <= ep.appliedi {
plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1", plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
apply.snapshot.Metadata.Index, ep.appliedi) apply.snapshot.Metadata.Index, ep.appliedi)
@ -630,17 +633,25 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
} }
newbe := backend.NewDefaultBackend(fn) newbe := backend.NewDefaultBackend(fn)
plog.Info("restoring mvcc store...")
if err := s.kv.Restore(newbe); err != nil { if err := s.kv.Restore(newbe); err != nil {
plog.Panicf("restore KV error: %v", err) plog.Panicf("restore KV error: %v", err)
} }
s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex()) s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex())
plog.Info("finished restoring mvcc store")
// Closing old backend might block until all the txns // Closing old backend might block until all the txns
// on the backend are finished. // on the backend are finished.
// We do not want to wait on closing the old backend. // We do not want to wait on closing the old backend.
s.bemu.Lock() s.bemu.Lock()
oldbe := s.be oldbe := s.be
go func() { go func() {
plog.Info("closing old backend...")
defer plog.Info("finished closing old backend")
if err := oldbe.Close(); err != nil { if err := oldbe.Close(); err != nil {
plog.Panicf("close backend error: %v", err) plog.Panicf("close backend error: %v", err)
} }
@ -650,36 +661,51 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
s.bemu.Unlock() s.bemu.Unlock()
if s.lessor != nil { if s.lessor != nil {
plog.Info("recovering lessor...")
s.lessor.Recover(newbe, s.kv) s.lessor.Recover(newbe, s.kv)
plog.Info("finished recovering lessor")
} }
plog.Info("recovering alarms...")
if err := s.restoreAlarms(); err != nil { if err := s.restoreAlarms(); err != nil {
plog.Panicf("restore alarms error: %v", err) plog.Panicf("restore alarms error: %v", err)
} }
plog.Info("finished recovering alarms")
if s.authStore != nil { if s.authStore != nil {
plog.Info("recovering auth store...")
s.authStore.Recover(newbe) s.authStore.Recover(newbe)
plog.Info("finished recovering auth store")
} }
plog.Info("recovering store v2...")
if err := s.store.Recovery(apply.snapshot.Data); err != nil { if err := s.store.Recovery(apply.snapshot.Data); err != nil {
plog.Panicf("recovery store error: %v", err) plog.Panicf("recovery store error: %v", err)
} }
s.cluster.SetBackend(s.be) plog.Info("finished recovering store v2")
s.cluster.Recover()
s.cluster.SetBackend(s.be)
plog.Info("recovering cluster configuration...")
s.cluster.Recover()
plog.Info("finished recovering cluster configuration")
plog.Info("removing old peers from network...")
// recover raft transport // recover raft transport
s.r.transport.RemoveAllPeers() s.r.transport.RemoveAllPeers()
plog.Info("finished removing old peers from network")
plog.Info("adding peers from new cluster configuration into network...")
for _, m := range s.cluster.Members() { for _, m := range s.cluster.Members() {
if m.ID == s.ID() { if m.ID == s.ID() {
continue continue
} }
s.r.transport.AddPeer(m.ID, m.PeerURLs) s.r.transport.AddPeer(m.ID, m.PeerURLs)
} }
plog.Info("finished adding peers from new cluster configuration into network...")
ep.appliedi = apply.snapshot.Metadata.Index ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState ep.confState = apply.snapshot.Metadata.ConfState
plog.Infof("recovered from incoming snapshot at index %d", ep.snapi)
} }
func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
@ -1075,21 +1101,16 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
plog.Panicf("nodeID should always be equal to member ID") plog.Panicf("nodeID should always be equal to member ID")
} }
s.cluster.AddMember(m) s.cluster.AddMember(m)
if m.ID == s.id { if m.ID != s.id {
plog.Noticef("added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
} else {
s.r.transport.AddPeer(m.ID, m.PeerURLs) s.r.transport.AddPeer(m.ID, m.PeerURLs)
plog.Noticef("added member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
} }
case raftpb.ConfChangeRemoveNode: case raftpb.ConfChangeRemoveNode:
id := types.ID(cc.NodeID) id := types.ID(cc.NodeID)
s.cluster.RemoveMember(id) s.cluster.RemoveMember(id)
if id == s.id { if id == s.id {
return true, nil return true, nil
} else {
s.r.transport.RemovePeer(id)
plog.Noticef("removed member %s from cluster %s", id, s.cluster.ID())
} }
s.r.transport.RemovePeer(id)
case raftpb.ConfChangeUpdateNode: case raftpb.ConfChangeUpdateNode:
m := new(membership.Member) m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil { if err := json.Unmarshal(cc.Context, m); err != nil {
@ -1099,11 +1120,8 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
plog.Panicf("nodeID should always be equal to member ID") plog.Panicf("nodeID should always be equal to member ID")
} }
s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
if m.ID == s.id { if m.ID != s.id {
plog.Noticef("update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
} else {
s.r.transport.UpdatePeer(m.ID, m.PeerURLs) s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
plog.Noticef("update member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
} }
} }
return false, nil return false, nil

View File

@ -118,6 +118,9 @@ type peer struct {
} }
func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
plog.Infof("starting peer %s...", to)
defer plog.Infof("started peer %s", to)
status := newPeerStatus(to) status := newPeerStatus(to)
picker := newURLPicker(urls) picker := newURLPicker(urls)
pipeline := &pipeline{ pipeline := &pipeline{
@ -270,6 +273,9 @@ func (p *peer) Resume() {
} }
func (p *peer) stop() { func (p *peer) stop() {
plog.Infof("stopping peer %s...", p.id)
defer plog.Infof("stopped peer %s", p.id)
close(p.stopc) close(p.stopc)
p.cancel() p.cancel()
p.msgAppV2Writer.stop() p.msgAppV2Writer.stop()

View File

@ -44,7 +44,7 @@ func (s *peerStatus) activate() {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if !s.active { if !s.active {
plog.Infof("the connection with %s became active", s.id) plog.Infof("peer %s became active", s.id)
s.active = true s.active = true
s.since = time.Now() s.since = time.Now()
} }
@ -56,7 +56,7 @@ func (s *peerStatus) deactivate(failure failureType, reason string) {
msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason) msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
if s.active { if s.active {
plog.Errorf(msg) plog.Errorf(msg)
plog.Infof("the connection with %s became inactive", s.id) plog.Infof("peer %s became inactive", s.id)
s.active = false s.active = false
s.since = time.Time{} s.since = time.Time{}
return return

View File

@ -64,11 +64,13 @@ func (p *pipeline) start() {
for i := 0; i < connPerPipeline; i++ { for i := 0; i < connPerPipeline; i++ {
go p.handle() go p.handle()
} }
plog.Infof("started HTTP pipelining with peer %s", p.to)
} }
func (p *pipeline) stop() { func (p *pipeline) stop() {
close(p.stopc) close(p.stopc)
p.wg.Wait() p.wg.Wait()
plog.Infof("stopped HTTP pipelining with peer %s", p.to)
} }
func (p *pipeline) handle() { func (p *pipeline) handle() {

View File

@ -48,7 +48,7 @@ func monitorProbingStatus(s probing.Status, id string) {
select { select {
case <-time.After(statusMonitoringInterval): case <-time.After(statusMonitoringInterval):
if !s.Health() { if !s.Health() {
plog.Warningf("the connection to peer %s is unhealthy", id) plog.Warningf("health check for peer %s failed", id)
} }
if s.ClockDiff() > time.Second { if s.ClockDiff() > time.Second {
plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second) plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)

View File

@ -141,6 +141,8 @@ func (cw *streamWriter) run() {
tickc := time.Tick(ConnReadTimeout / 3) tickc := time.Tick(ConnReadTimeout / 3)
unflushed := 0 unflushed := 0
plog.Infof("started streaming with peer %s (writer)", cw.id)
for { for {
select { select {
case <-heartbeatc: case <-heartbeatc:
@ -155,7 +157,9 @@ func (cw *streamWriter) run() {
} }
cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error()) cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
cw.close() cw.close()
plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.id, t)
heartbeatc, msgc = nil, nil heartbeatc, msgc = nil, nil
case m := <-msgc: case m := <-msgc:
@ -177,11 +181,14 @@ func (cw *streamWriter) run() {
cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
cw.close() cw.close()
plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.id, t)
heartbeatc, msgc = nil, nil heartbeatc, msgc = nil, nil
cw.r.ReportUnreachable(m.To) cw.r.ReportUnreachable(m.To)
case conn := <-cw.connc: case conn := <-cw.connc:
cw.close() if cw.close() {
plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.id, t)
}
t = conn.t t = conn.t
switch conn.t { switch conn.t {
case streamTypeMsgAppV2: case streamTypeMsgAppV2:
@ -198,10 +205,14 @@ func (cw *streamWriter) run() {
cw.closer = conn.Closer cw.closer = conn.Closer
cw.working = true cw.working = true
cw.mu.Unlock() cw.mu.Unlock()
plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.id, t)
heartbeatc, msgc = tickc, cw.msgc heartbeatc, msgc = tickc, cw.msgc
case <-cw.stopc: case <-cw.stopc:
cw.close() if cw.close() {
plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.id, t)
}
close(cw.done) close(cw.done)
plog.Infof("stopped streaming with peer %s (writer)", cw.id)
return return
} }
} }
@ -213,11 +224,11 @@ func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
return cw.msgc, cw.working return cw.msgc, cw.working
} }
func (cw *streamWriter) close() { func (cw *streamWriter) close() bool {
cw.mu.Lock() cw.mu.Lock()
defer cw.mu.Unlock() defer cw.mu.Unlock()
if !cw.working { if !cw.working {
return return false
} }
cw.closer.Close() cw.closer.Close()
if len(cw.msgc) > 0 { if len(cw.msgc) > 0 {
@ -225,6 +236,7 @@ func (cw *streamWriter) close() {
} }
cw.msgc = make(chan raftpb.Message, streamBufSize) cw.msgc = make(chan raftpb.Message, streamBufSize)
cw.working = false cw.working = false
return true
} }
func (cw *streamWriter) attach(conn *outgoingConn) bool { func (cw *streamWriter) attach(conn *outgoingConn) bool {
@ -275,8 +287,9 @@ func (r *streamReader) start() {
} }
func (cr *streamReader) run() { func (cr *streamReader) run() {
t := cr.typ
plog.Infof("started streaming with peer %s (%s reader)", cr.to, t)
for { for {
t := cr.typ
rc, err := cr.dial(t) rc, err := cr.dial(t)
if err != nil { if err != nil {
if err != errUnsupportedStreamType { if err != errUnsupportedStreamType {
@ -284,7 +297,9 @@ func (cr *streamReader) run() {
} }
} else { } else {
cr.status.activate() cr.status.activate()
plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.to, cr.typ)
err := cr.decodeLoop(rc, t) err := cr.decodeLoop(rc, t)
plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.to, cr.typ)
switch { switch {
// all data is read out // all data is read out
case err == io.EOF: case err == io.EOF:
@ -300,6 +315,7 @@ func (cr *streamReader) run() {
case <-time.After(100 * time.Millisecond): case <-time.After(100 * time.Millisecond):
case <-cr.stopc: case <-cr.stopc:
close(cr.done) close(cr.done)
plog.Infof("stopped streaming with peer %s (%s reader)", cr.to, t)
return return
} }
} }

View File

@ -231,6 +231,7 @@ func (t *Transport) AddRemote(id types.ID, us []string) {
func (t *Transport) AddPeer(id types.ID, us []string) { func (t *Transport) AddPeer(id types.ID, us []string) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
if t.peers == nil { if t.peers == nil {
panic("transport stopped") panic("transport stopped")
} }
@ -244,6 +245,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
fs := t.LeaderStats.Follower(id.String()) fs := t.LeaderStats.Follower(id.String())
t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC) t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC)
addPeerToProber(t.prober, id.String(), us) addPeerToProber(t.prober, id.String(), us)
plog.Infof("added peer %s", id)
} }
func (t *Transport) RemovePeer(id types.ID) { func (t *Transport) RemovePeer(id types.ID) {
@ -270,6 +273,7 @@ func (t *Transport) removePeer(id types.ID) {
delete(t.peers, id) delete(t.peers, id)
delete(t.LeaderStats.Followers, id.String()) delete(t.LeaderStats.Followers, id.String())
t.prober.Remove(id.String()) t.prober.Remove(id.String())
plog.Infof("removed peer %s", id)
} }
func (t *Transport) UpdatePeer(id types.ID, us []string) { func (t *Transport) UpdatePeer(id types.ID, us []string) {
@ -287,6 +291,7 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
t.prober.Remove(id.String()) t.prober.Remove(id.String())
addPeerToProber(t.prober, id.String(), us) addPeerToProber(t.prober, id.String(), us)
plog.Infof("updated peer %s", id)
} }
func (t *Transport) ActiveSince(id types.ID) time.Time { func (t *Transport) ActiveSince(id types.ID) time.Time {