From e0f97966531b7677e9130cfbd8482620735f709b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 8 Jun 2015 13:28:34 -0700 Subject: [PATCH] etcdserver: use leveled logging Leveled logging for etcdserver pkg. --- etcdserver/cluster.go | 33 +++++++------- etcdserver/cluster_util.go | 31 +++++++------ etcdserver/config.go | 25 +++++------ etcdserver/member.go | 5 +-- etcdserver/metrics.go | 7 ++- etcdserver/raft.go | 29 ++++++------ etcdserver/server.go | 92 +++++++++++++++++++------------------- etcdserver/server_test.go | 5 --- etcdserver/storage.go | 13 +++--- 9 files changed, 115 insertions(+), 125 deletions(-) diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 1a12831bf..f8fbfb403 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -20,7 +20,6 @@ import ( "encoding/binary" "encoding/json" "fmt" - "log" "path" "reflect" "sort" @@ -132,7 +131,7 @@ func (c *cluster) MemberByName(name string) *Member { for _, m := range c.members { if m.Name == name { if memb != nil { - log.Panicf("two members with the given name %q exist", name) + plog.Panicf("two members with the given name %q exist", name) } memb = m } @@ -245,7 +244,7 @@ func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { } m := new(Member) if err := json.Unmarshal(cc.Context, m); err != nil { - log.Panicf("unmarshal member should never fail: %v", err) + plog.Panicf("unmarshal member should never fail: %v", err) } for _, u := range m.PeerURLs { if urls[u] { @@ -271,7 +270,7 @@ func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { } m := new(Member) if err := json.Unmarshal(cc.Context, m); err != nil { - log.Panicf("unmarshal member should never fail: %v", err) + plog.Panicf("unmarshal member should never fail: %v", err) } for _, u := range m.PeerURLs { if urls[u] { @@ -279,7 +278,7 @@ func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { } } default: - log.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode") + plog.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode") } return nil } @@ -292,11 +291,11 @@ func (c *cluster) AddMember(m *Member) { defer c.Unlock() b, err := json.Marshal(m.RaftAttributes) if err != nil { - log.Panicf("marshal raftAttributes should never fail: %v", err) + plog.Panicf("marshal raftAttributes should never fail: %v", err) } p := path.Join(memberStoreKey(m.ID), raftAttributesSuffix) if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil { - log.Panicf("create raftAttributes should never fail: %v", err) + plog.Panicf("create raftAttributes should never fail: %v", err) } c.members[m.ID] = m } @@ -307,11 +306,11 @@ func (c *cluster) RemoveMember(id types.ID) { c.Lock() defer c.Unlock() if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil { - log.Panicf("delete member should never fail: %v", err) + plog.Panicf("delete member should never fail: %v", err) } delete(c.members, id) if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil { - log.Panicf("create removedMember should never fail: %v", err) + plog.Panicf("create removedMember should never fail: %v", err) } c.removed[id] = true } @@ -328,11 +327,11 @@ func (c *cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { defer c.Unlock() b, err := json.Marshal(raftAttr) if err != nil { - log.Panicf("marshal raftAttributes should never fail: %v", err) + plog.Panicf("marshal raftAttributes should never fail: %v", err) } p := path.Join(memberStoreKey(id), raftAttributesSuffix) if _, err := c.store.Update(p, string(b), store.Permanent); err != nil { - log.Panicf("update raftAttributes should never fail: %v", err) + plog.Panicf("update raftAttributes should never fail: %v", err) } c.members[id].RaftAttributes = raftAttr } @@ -350,9 +349,9 @@ func (c *cluster) SetVersion(ver *semver.Version) { c.Lock() defer c.Unlock() if c.version != nil { - log.Printf("etcdsever: updated the cluster version from %v to %v", c.version.String(), ver.String()) + plog.Noticef("updated the cluster version from %v to %v", c.version.String(), ver.String()) } else { - log.Printf("etcdsever: set the initial cluster version to %v", ver.String()) + plog.Noticef("set the initial cluster version to %v", ver.String()) } c.version = ver } @@ -365,12 +364,12 @@ func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) if isKeyNotFound(err) { return members, removed } - log.Panicf("get storeMembers should never fail: %v", err) + plog.Panicf("get storeMembers should never fail: %v", err) } for _, n := range e.Node.Nodes { m, err := nodeToMember(n) if err != nil { - log.Panicf("nodeToMember should never fail: %v", err) + plog.Panicf("nodeToMember should never fail: %v", err) } members[m.ID] = m } @@ -380,7 +379,7 @@ func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) if isKeyNotFound(err) { return members, removed } - log.Panicf("get storeRemovedMembers should never fail: %v", err) + plog.Panicf("get storeRemovedMembers should never fail: %v", err) } for _, n := range e.Node.Nodes { removed[mustParseMemberIDFromKey(n.Key)] = true @@ -394,7 +393,7 @@ func clusterVersionFromStore(st store.Store) *semver.Version { if isKeyNotFound(err) { return nil } - log.Panicf("etcdserver: unexpected error (%v) when getting cluster version from store", err) + plog.Panicf("unexpected error (%v) when getting cluster version from store", err) } return semver.Must(semver.NewVersion(*e.Node.Value)) } diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index aa11e513f..5dd4017c4 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -18,7 +18,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "log" "net/http" "sort" "time" @@ -65,34 +64,34 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) ( resp, err := cc.Get(u + "/members") if err != nil { if logerr { - log.Printf("etcdserver: could not get cluster response from %s: %v", u, err) + plog.Warningf("could not get cluster response from %s: %v", u, err) } continue } b, err := ioutil.ReadAll(resp.Body) if err != nil { if logerr { - log.Printf("etcdserver: could not read the body of cluster response: %v", err) + plog.Warningf("could not read the body of cluster response: %v", err) } continue } var membs []*Member if err := json.Unmarshal(b, &membs); err != nil { if logerr { - log.Printf("etcdserver: could not unmarshal cluster response: %v", err) + plog.Warningf("could not unmarshal cluster response: %v", err) } continue } id, err := types.IDFromString(resp.Header.Get("X-Etcd-Cluster-ID")) if err != nil { if logerr { - log.Printf("etcdserver: could not parse the cluster ID from cluster res: %v", err) + plog.Warningf("could not parse the cluster ID from cluster res: %v", err) } continue } return newClusterFromMembers("", id, membs), nil } - return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls") + return nil, fmt.Errorf("could not retrieve cluster information from the given urls") } // getRemotePeerURLs returns peer urls of remote members in the cluster. The @@ -127,7 +126,7 @@ func getVersions(cl Cluster, local types.ID, tr *http.Transport) map[string]*ver } ver, err := getVersion(m, tr) if err != nil { - log.Printf("etcdserver: cannot get the version of member %s (%v)", m.ID, err) + plog.Warningf("cannot get the version of member %s (%v)", m.ID, err) vers[m.ID.String()] = nil } else { vers[m.ID.String()] = ver @@ -149,12 +148,12 @@ func decideClusterVersion(vers map[string]*version.Versions) *semver.Version { } v, err := semver.NewVersion(ver.Server) if err != nil { - log.Printf("etcdserver: cannot understand the version of member %s (%v)", mid, err) + plog.Errorf("cannot understand the version of member %s (%v)", mid, err) return nil } if lv.LessThan(*v) { - log.Printf("etcdserver: the etcd version %s is not up-to-date", lv.String()) - log.Printf("etcdserver: member %s has a higher version %s", mid, ver) + plog.Warningf("the etcd version %s is not up-to-date", lv.String()) + plog.Warningf("member %s has a higher version %s", mid, ver) } if cv == nil { cv = v @@ -195,15 +194,15 @@ func isCompatibleWithVers(vers map[string]*version.Versions, local types.ID, min } clusterv, err := semver.NewVersion(v.Cluster) if err != nil { - log.Printf("etcdserver: cannot understand the cluster version of member %s (%v)", id, err) + plog.Errorf("cannot understand the cluster version of member %s (%v)", id, err) continue } if clusterv.LessThan(*minV) { - log.Printf("etcdserver: the running cluster version(%v) is lower than the minimal cluster version(%v) supported", clusterv.String(), minV.String()) + plog.Warningf("the running cluster version(%v) is lower than the minimal cluster version(%v) supported", clusterv.String(), minV.String()) return false } if maxV.LessThan(*clusterv) { - log.Printf("etcdserver: the running cluster version(%v) is higher than the maximum cluster version(%v) supported", clusterv.String(), maxV.String()) + plog.Warningf("the running cluster version(%v) is higher than the maximum cluster version(%v) supported", clusterv.String(), maxV.String()) return false } ok = true @@ -226,7 +225,7 @@ func getVersion(m *Member, tr *http.Transport) (*version.Versions, error) { for _, u := range m.PeerURLs { resp, err = cc.Get(u + "/version") if err != nil { - log.Printf("etcdserver: failed to reach the peerURL(%s) of member %s (%v)", u, m.ID, err) + plog.Warningf("failed to reach the peerURL(%s) of member %s (%v)", u, m.ID, err) continue } // etcd 2.0 does not have version endpoint on peer url. @@ -241,12 +240,12 @@ func getVersion(m *Member, tr *http.Transport) (*version.Versions, error) { b, err := ioutil.ReadAll(resp.Body) resp.Body.Close() if err != nil { - log.Printf("etcdserver: failed to read out the response body from the peerURL(%s) of member %s (%v)", u, m.ID, err) + plog.Warningf("failed to read out the response body from the peerURL(%s) of member %s (%v)", u, m.ID, err) continue } var vers version.Versions if err := json.Unmarshal(b, &vers); err != nil { - log.Printf("etcdserver: failed to unmarshal the response body got from the peerURL(%s) of member %s (%v)", u, m.ID, err) + plog.Warningf("failed to unmarshal the response body got from the peerURL(%s) of member %s (%v)", u, m.ID, err) continue } return &vers, nil diff --git a/etcdserver/config.go b/etcdserver/config.go index afeb5558d..316797417 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -16,7 +16,6 @@ package etcdserver import ( "fmt" - "log" "net/http" "path" "reflect" @@ -114,25 +113,25 @@ func (c *ServerConfig) PrintWithInitial() { c.print(true) } func (c *ServerConfig) Print() { c.print(false) } func (c *ServerConfig) print(initial bool) { - log.Printf("etcdserver: name = %s", c.Name) + plog.Infof("name = %s", c.Name) if c.ForceNewCluster { - log.Println("etcdserver: force new cluster") + plog.Infof("force new cluster") } - log.Printf("etcdserver: data dir = %s", c.DataDir) - log.Printf("etcdserver: member dir = %s", c.MemberDir()) - log.Printf("etcdserver: heartbeat = %dms", c.TickMs) - log.Printf("etcdserver: election = %dms", c.ElectionTicks*int(c.TickMs)) - log.Printf("etcdserver: snapshot count = %d", c.SnapCount) + plog.Infof("data dir = %s", c.DataDir) + plog.Infof("member dir = %s", c.MemberDir()) + plog.Infof("heartbeat = %dms", c.TickMs) + plog.Infof("election = %dms", c.ElectionTicks*int(c.TickMs)) + plog.Infof("snapshot count = %d", c.SnapCount) if len(c.DiscoveryURL) != 0 { - log.Printf("etcdserver: discovery URL= %s", c.DiscoveryURL) + plog.Infof("discovery URL= %s", c.DiscoveryURL) if len(c.DiscoveryProxy) != 0 { - log.Printf("etcdserver: discovery proxy = %s", c.DiscoveryProxy) + plog.Infof("discovery proxy = %s", c.DiscoveryProxy) } } - log.Printf("etcdserver: advertise client URLs = %s", c.ClientURLs) + plog.Infof("advertise client URLs = %s", c.ClientURLs) if initial { - log.Printf("etcdserver: initial advertise peer URLs = %s", c.PeerURLs) - log.Printf("etcdserver: initial cluster = %s", c.InitialPeerURLsMap) + plog.Infof("initial advertise peer URLs = %s", c.PeerURLs) + plog.Infof("initial cluster = %s", c.InitialPeerURLsMap) } } diff --git a/etcdserver/member.go b/etcdserver/member.go index 2aee85aa1..2d1c841e8 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -19,7 +19,6 @@ import ( "encoding/binary" "encoding/json" "fmt" - "log" "math/rand" "path" "sort" @@ -80,7 +79,7 @@ func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.T // It will panic if there is no PeerURLs available in Member. func (m *Member) PickPeerURL() string { if len(m.PeerURLs) == 0 { - log.Panicf("member should always have some peer url") + plog.Panicf("member should always have some peer url") } return m.PeerURLs[rand.Intn(len(m.PeerURLs))] } @@ -117,7 +116,7 @@ func MemberAttributesStorePath(id types.ID) string { func mustParseMemberIDFromKey(key string) types.ID { id, err := types.IDFromString(path.Base(key)) if err != nil { - log.Panicf("unexpected parse member id error: %v", err) + plog.Panicf("unexpected parse member id error: %v", err) } return id } diff --git a/etcdserver/metrics.go b/etcdserver/metrics.go index 296b18fdb..4a9713b61 100644 --- a/etcdserver/metrics.go +++ b/etcdserver/metrics.go @@ -15,7 +15,6 @@ package etcdserver import ( - "log" "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" @@ -66,17 +65,17 @@ func monitorFileDescriptor(done <-chan struct{}) { for { used, err := runtime.FDUsage() if err != nil { - log.Printf("etcdserver: cannot monitor file descriptor usage (%v)", err) + plog.Errorf("cannot monitor file descriptor usage (%v)", err) return } fileDescriptorUsed.Set(float64(used)) limit, err := runtime.FDLimit() if err != nil { - log.Printf("etcdserver: cannot monitor file descriptor usage (%v)", err) + plog.Errorf("cannot monitor file descriptor usage (%v)", err) return } if used >= limit/5*4 { - log.Printf("etcdserver: 80%% of the file descriptor limit is used [used = %d, limit = %d]", used, limit) + plog.Warningf("80%% of the file descriptor limit is used [used = %d, limit = %d]", used, limit) } select { case <-ticker.C: diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 040810a01..be2509c23 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -17,7 +17,6 @@ package etcdserver import ( "encoding/json" "expvar" - "log" "os" "sort" "sync/atomic" @@ -149,13 +148,13 @@ func (r *raftNode) run() { if !raft.IsEmptySnap(rd.Snapshot) { if err := r.storage.SaveSnap(rd.Snapshot); err != nil { - log.Fatalf("etcdraft: save snapshot error: %v", err) + plog.Fatalf("raft save snapshot error: %v", err) } r.raftStorage.ApplySnapshot(rd.Snapshot) - log.Printf("etcdraft: applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) + plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) } if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { - log.Fatalf("etcdraft: save state and entries error: %v", err) + plog.Fatalf("raft save state and entries error: %v", err) } r.raftStorage.Append(rd.Entries) @@ -179,7 +178,7 @@ func (r *raftNode) stop() { r.Stop() r.transport.Stop() if err := r.storage.Close(); err != nil { - log.Panicf("etcdraft: close storage error: %v", err) + plog.Panicf("raft close storage error: %v", err) } close(r.done) } @@ -205,21 +204,21 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r }, ) if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil { - log.Fatalf("etcdserver create snapshot directory error: %v", err) + plog.Fatalf("create snapshot directory error: %v", err) } if w, err = wal.Create(cfg.WALDir(), metadata); err != nil { - log.Fatalf("etcdserver: create wal error: %v", err) + plog.Fatalf("create wal error: %v", err) } peers := make([]raft.Peer, len(ids)) for i, id := range ids { ctx, err := json.Marshal((*cl).Member(id)) if err != nil { - log.Panicf("marshal member should never fail: %v", err) + plog.Panicf("marshal member should never fail: %v", err) } peers[i] = raft.Peer{ID: uint64(id), Context: ctx} } id = member.ID - log.Printf("etcdserver: start member %s in cluster %s", id, cl.ID()) + plog.Infof("starting member %s in cluster %s", id, cl.ID()) s = raft.NewMemoryStorage() c := &raft.Config{ ID: uint64(id), @@ -241,7 +240,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust } w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap) - log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cid, st.Commit) + plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit) cl := newCluster("") cl.SetID(cid) s := raft.NewMemoryStorage() @@ -273,7 +272,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type // discard the previously uncommitted entries for i, ent := range ents { if ent.Index > st.Commit { - log.Printf("etcdserver: discarding %d uncommitted WAL entries ", len(ents)-i) + plog.Infof("discarding %d uncommitted WAL entries ", len(ents)-i) ents = ents[:i] break } @@ -286,13 +285,13 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type // force commit newly appended entries err := w.Save(raftpb.HardState{}, toAppEnts) if err != nil { - log.Fatalf("etcdserver: %v", err) + plog.Fatalf("%v", err) } if len(ents) != 0 { st.Commit = ents[len(ents)-1].Index } - log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit) + plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit) cl := newCluster("") cl.SetID(cid) s := raft.NewMemoryStorage() @@ -338,7 +337,7 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { case raftpb.ConfChangeRemoveNode: delete(ids, cc.NodeID) default: - log.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!") + plog.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!") } } sids := make(types.Uint64Slice, 0) @@ -383,7 +382,7 @@ func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raf } ctx, err := json.Marshal(m) if err != nil { - log.Panicf("marshal member should never fail: %v", err) + plog.Panicf("marshal member should never fail: %v", err) } cc := &raftpb.ConfChange{ Type: raftpb.ConfChangeAddNode, diff --git a/etcdserver/server.go b/etcdserver/server.go index 4ed40cb78..4d421c535 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -18,7 +18,6 @@ import ( "encoding/json" "expvar" "fmt" - "log" "math/rand" "net/http" "path" @@ -27,6 +26,7 @@ import ( "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" @@ -66,6 +66,8 @@ const ( ) var ( + plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver") + storeMemberAttributeRegexp = regexp.MustCompile(path.Join(storeMembersPrefix, "[[:xdigit:]]{1,16}", attributesSuffix)) ) @@ -256,7 +258,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { } if cfg.ShouldDiscover() { - log.Printf("etcdserver: discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir()) + plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir()) } snapshot, err := ss.Load() if err != nil && err != snap.ErrNoSnapshot { @@ -264,13 +266,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { } if snapshot != nil { if err := st.Recovery(snapshot.Data); err != nil { - log.Panicf("etcdserver: recovered store from snapshot error: %v", err) + plog.Panicf("recovered store from snapshot error: %v", err) } - log.Printf("etcdserver: recovered store from snapshot at index %d", snapshot.Metadata.Index) + plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index) } cfg.Print() if snapshot != nil { - log.Printf("etcdserver: loaded cluster information from store: %s", cl) + plog.Infof("loaded cluster information from store: %s", cl) } if !cfg.ForceNewCluster { id, cl, n, s, w = restartNode(cfg, snapshot) @@ -344,16 +346,16 @@ func (s *EtcdServer) Start() { // This function is just used for testing. func (s *EtcdServer) start() { if s.snapCount == 0 { - log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount) + plog.Infof("set snapshot count to default %d", DefaultSnapCount) s.snapCount = DefaultSnapCount } s.w = wait.New() s.done = make(chan struct{}) s.stop = make(chan struct{}) if s.ClusterVersion() != nil { - log.Printf("etcdserver: starting server... [version: %v, cluster version: %v]", version.Version, s.ClusterVersion()) + plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, s.ClusterVersion()) } else { - log.Printf("etcdserver: starting server... [version: %v, cluster version: to_be_decided]", version.Version) + plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version) } // TODO: if this is an empty log, writes all peer infos // into the first entry @@ -370,9 +372,9 @@ func (s *EtcdServer) purgeFile() { } select { case e := <-werrc: - log.Fatalf("etcdserver: failed to purge wal file %v", e) + plog.Fatalf("failed to purge wal file %v", e) case e := <-serrc: - log.Fatalf("etcdserver: failed to purge snap file %v", e) + plog.Fatalf("failed to purge snap file %v", e) case <-s.done: return } @@ -386,7 +388,7 @@ func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { if s.cluster.IsIDRemoved(types.ID(m.From)) { - log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String()) + plog.Warningf("reject message from removed member %s", types.ID(m.From).String()) return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") } if m.Type == raftpb.MsgApp { @@ -406,7 +408,7 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) { func (s *EtcdServer) run() { snap, err := s.r.raftStorage.Snapshot() if err != nil { - log.Panicf("etcdserver: get snapshot from raft storage error: %v", err) + plog.Panicf("get snapshot from raft storage error: %v", err) } confState := snap.Metadata.ConfState snapi := snap.Metadata.Index @@ -428,12 +430,12 @@ func (s *EtcdServer) run() { // apply snapshot if !raft.IsEmptySnap(apply.snapshot) { if apply.snapshot.Metadata.Index <= appliedi { - log.Panicf("etcdserver: snapshot index [%d] should > appliedi[%d] + 1", + plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1", apply.snapshot.Metadata.Index, appliedi) } if err := s.store.Recovery(apply.snapshot.Data); err != nil { - log.Panicf("recovery store error: %v", err) + plog.Panicf("recovery store error: %v", err) } s.cluster.Recover() @@ -449,14 +451,14 @@ func (s *EtcdServer) run() { appliedi = apply.snapshot.Metadata.Index snapi = appliedi confState = apply.snapshot.Metadata.ConfState - log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi) + plog.Infof("recovered from incoming snapshot at index %d", snapi) } // apply entries if len(apply.entries) != 0 { firsti := apply.entries[0].Index if firsti > appliedi+1 { - log.Panicf("etcdserver: first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi) + plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi) } var ents []raftpb.Entry if appliedi+1-firsti < uint64(len(apply.entries)) { @@ -474,13 +476,13 @@ func (s *EtcdServer) run() { // trigger snapshot if appliedi-snapi > s.snapCount { - log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi) + plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi) s.snapshot(appliedi, confState) snapi = appliedi } case err := <-s.errorc: - log.Printf("etcdserver: %s", err) - log.Printf("etcdserver: the data-dir used by this member must be removed.") + plog.Errorf("%s", err) + plog.Infof("the data-dir used by this member must be removed.") return case <-s.stop: return @@ -650,7 +652,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error return err } if x != nil { - log.Panicf("return type should always be error") + plog.Panicf("return type should always be error") } return nil case <-ctx.Done(): @@ -688,7 +690,7 @@ func (s *EtcdServer) sync(timeout time.Duration) { func (s *EtcdServer) publish(retryInterval time.Duration) { b, err := json.Marshal(s.attributes) if err != nil { - log.Printf("etcdserver: json marshal error: %v", err) + plog.Panicf("json marshal error: %v", err) return } req := pb.Request{ @@ -703,13 +705,13 @@ func (s *EtcdServer) publish(retryInterval time.Duration) { cancel() switch err { case nil: - log.Printf("etcdserver: published %+v to cluster %s", s.attributes, s.cluster.ID()) + plog.Infof("published %+v to cluster %s", s.attributes, s.cluster.ID()) return case ErrStopped: - log.Printf("etcdserver: aborting publish because server is stopped") + plog.Infof("aborting publish because server is stopped") return default: - log.Printf("etcdserver: publish error: %v", err) + plog.Errorf("publish error: %v", err) } } } @@ -752,7 +754,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint shouldstop, err = s.applyConfChange(cc, confState) s.w.Trigger(cc.ID, err) default: - log.Panicf("entry type should be either EntryNormal or EntryConfChange") + plog.Panicf("entry type should be either EntryNormal or EntryConfChange") } atomic.StoreUint64(&s.r.index, e.Index) atomic.StoreUint64(&s.r.term, e.Term) @@ -792,7 +794,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { id := mustParseMemberIDFromKey(path.Dir(r.Path)) var attr Attributes if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { - log.Panicf("unmarshal %s should never fail: %v", r.Val, err) + plog.Panicf("unmarshal %s should never fail: %v", r.Val, err) } s.cluster.UpdateAttributes(id, attr) } @@ -832,17 +834,17 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con case raftpb.ConfChangeAddNode: m := new(Member) if err := json.Unmarshal(cc.Context, m); err != nil { - log.Panicf("unmarshal member should never fail: %v", err) + plog.Panicf("unmarshal member should never fail: %v", err) } if cc.NodeID != uint64(m.ID) { - log.Panicf("nodeID should always be equal to member ID") + plog.Panicf("nodeID should always be equal to member ID") } s.cluster.AddMember(m) if m.ID == s.id { - log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) + plog.Noticef("added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) } else { s.r.transport.AddPeer(m.ID, m.PeerURLs) - log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) + plog.Noticef("added member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) } case raftpb.ConfChangeRemoveNode: id := types.ID(cc.NodeID) @@ -851,22 +853,22 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con return true, nil } else { s.r.transport.RemovePeer(id) - log.Printf("etcdserver: removed member %s from cluster %s", id, s.cluster.ID()) + plog.Noticef("removed member %s from cluster %s", id, s.cluster.ID()) } case raftpb.ConfChangeUpdateNode: m := new(Member) if err := json.Unmarshal(cc.Context, m); err != nil { - log.Panicf("unmarshal member should never fail: %v", err) + plog.Panicf("unmarshal member should never fail: %v", err) } if cc.NodeID != uint64(m.ID) { - log.Panicf("nodeID should always be equal to member ID") + plog.Panicf("nodeID should always be equal to member ID") } s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) if m.ID == s.id { - log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) + plog.Noticef("update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) } else { s.r.transport.UpdatePeer(m.ID, m.PeerURLs) - log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) + plog.Noticef("update member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) } } return false, nil @@ -881,7 +883,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { // TODO: current store will never fail to do a snapshot // what should we do if the store might fail? if err != nil { - log.Panicf("etcdserver: store save should never fail: %v", err) + plog.Panicf("store save should never fail: %v", err) } snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d) if err != nil { @@ -890,12 +892,12 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { if err == raft.ErrSnapOutOfDate { return } - log.Panicf("etcdserver: unexpected create snapshot error %v", err) + plog.Panicf("unexpected create snapshot error %v", err) } if err := s.r.storage.SaveSnap(snap); err != nil { - log.Fatalf("etcdserver: save snapshot error: %v", err) + plog.Fatalf("save snapshot error: %v", err) } - log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index) + plog.Infof("saved snapshot at index %d", snap.Metadata.Index) // keep some in memory log entries for slow followers. compacti := uint64(1) @@ -909,9 +911,9 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { if err == raft.ErrCompacted { return } - log.Panicf("etcdserver: unexpected compaction error %v", err) + plog.Panicf("unexpected compaction error %v", err) } - log.Printf("etcdserver: compacted raft log at %d", compacti) + plog.Info("compacted raft log at %d", compacti) }() } @@ -974,9 +976,9 @@ func (s *EtcdServer) monitorVersions() { func (s *EtcdServer) updateClusterVersion(ver string) { if s.cluster.Version() == nil { - log.Printf("etcdsever: setting up the initial cluster version to %v", ver) + plog.Infof("setting up the initial cluster version to %v", ver) } else { - log.Printf("etcdsever: updating the cluster version from %v to %v", s.cluster.Version(), ver) + plog.Infof("updating the cluster version from %v to %v", s.cluster.Version(), ver) } req := pb.Request{ Method: "PUT", @@ -990,9 +992,9 @@ func (s *EtcdServer) updateClusterVersion(ver string) { case nil: return case ErrStopped: - log.Printf("etcdserver: aborting update cluster version because server is stopped") + plog.Infof("aborting update cluster version because server is stopped") return default: - log.Printf("etcdserver: error updating cluster version (%v)", err) + plog.Errorf("error updating cluster version (%v)", err) } } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 8a922a8d6..1ecbd729d 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -17,10 +17,7 @@ package etcdserver import ( "encoding/json" "fmt" - "io/ioutil" - "log" "net/http" - "os" "path" "reflect" "strconv" @@ -1022,8 +1019,6 @@ func TestPublishStopped(t *testing.T) { // TestPublishRetry tests that publish will keep retry until success. func TestPublishRetry(t *testing.T) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stderr) n := &nodeRecorder{} srv := &EtcdServer{ r: raftNode{Node: n}, diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 0dfd53218..7acc59767 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -16,7 +16,6 @@ package etcdserver import ( "io" - "log" "os" "path" @@ -81,18 +80,18 @@ func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, repaired := false for { if w, err = wal.Open(waldir, snap); err != nil { - log.Fatalf("etcdserver: open wal error: %v", err) + plog.Fatalf("open wal error: %v", err) } if wmetadata, st, ents, err = w.ReadAll(); err != nil { w.Close() // we can only repair ErrUnexpectedEOF and we never repair twice. if repaired || err != io.ErrUnexpectedEOF { - log.Fatalf("etcdserver: read wal error (%v) and cannot be repaired", err) + plog.Fatalf("read wal error (%v) and cannot be repaired", err) } if !wal.Repair(waldir) { - log.Fatalf("etcdserver: WAL error (%v) cannot be repaired", err) + plog.Fatalf("WAL error (%v) cannot be repaired", err) } else { - log.Printf("etcdserver: repaired WAL error (%v)", err) + plog.Infof("repaired WAL error (%v)", err) repaired = true } continue @@ -111,10 +110,10 @@ func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, func upgradeDataDir(baseDataDir string, name string, ver version.DataDirVersion) error { switch ver { case version.DataDir0_4: - log.Print("etcdserver: converting v0.4 log to v2.0") + plog.Infof("converting v0.4 log to v2.0") err := migrate.Migrate4To2(baseDataDir, name) if err != nil { - log.Fatalf("etcdserver: failed migrating data-dir: %v", err) + plog.Fatalf("failed to migrate data-dir (%v)", err) return err } fallthrough