etcdserver: use leveled logging

Leveled logging for etcdserver pkg.
This commit is contained in:
Xiang Li 2015-06-08 13:28:34 -07:00
parent e0d5116683
commit e0f9796653
9 changed files with 115 additions and 125 deletions

View File

@ -20,7 +20,6 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"log"
"path"
"reflect"
"sort"
@ -132,7 +131,7 @@ func (c *cluster) MemberByName(name string) *Member {
for _, m := range c.members {
if m.Name == name {
if memb != nil {
log.Panicf("two members with the given name %q exist", name)
plog.Panicf("two members with the given name %q exist", name)
}
memb = m
}
@ -245,7 +244,7 @@ func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
}
m := new(Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
log.Panicf("unmarshal member should never fail: %v", err)
plog.Panicf("unmarshal member should never fail: %v", err)
}
for _, u := range m.PeerURLs {
if urls[u] {
@ -271,7 +270,7 @@ func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
}
m := new(Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
log.Panicf("unmarshal member should never fail: %v", err)
plog.Panicf("unmarshal member should never fail: %v", err)
}
for _, u := range m.PeerURLs {
if urls[u] {
@ -279,7 +278,7 @@ func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
}
}
default:
log.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode")
plog.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode")
}
return nil
}
@ -292,11 +291,11 @@ func (c *cluster) AddMember(m *Member) {
defer c.Unlock()
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
log.Panicf("marshal raftAttributes should never fail: %v", err)
plog.Panicf("marshal raftAttributes should never fail: %v", err)
}
p := path.Join(memberStoreKey(m.ID), raftAttributesSuffix)
if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil {
log.Panicf("create raftAttributes should never fail: %v", err)
plog.Panicf("create raftAttributes should never fail: %v", err)
}
c.members[m.ID] = m
}
@ -307,11 +306,11 @@ func (c *cluster) RemoveMember(id types.ID) {
c.Lock()
defer c.Unlock()
if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
log.Panicf("delete member should never fail: %v", err)
plog.Panicf("delete member should never fail: %v", err)
}
delete(c.members, id)
if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
log.Panicf("create removedMember should never fail: %v", err)
plog.Panicf("create removedMember should never fail: %v", err)
}
c.removed[id] = true
}
@ -328,11 +327,11 @@ func (c *cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
defer c.Unlock()
b, err := json.Marshal(raftAttr)
if err != nil {
log.Panicf("marshal raftAttributes should never fail: %v", err)
plog.Panicf("marshal raftAttributes should never fail: %v", err)
}
p := path.Join(memberStoreKey(id), raftAttributesSuffix)
if _, err := c.store.Update(p, string(b), store.Permanent); err != nil {
log.Panicf("update raftAttributes should never fail: %v", err)
plog.Panicf("update raftAttributes should never fail: %v", err)
}
c.members[id].RaftAttributes = raftAttr
}
@ -350,9 +349,9 @@ func (c *cluster) SetVersion(ver *semver.Version) {
c.Lock()
defer c.Unlock()
if c.version != nil {
log.Printf("etcdsever: updated the cluster version from %v to %v", c.version.String(), ver.String())
plog.Noticef("updated the cluster version from %v to %v", c.version.String(), ver.String())
} else {
log.Printf("etcdsever: set the initial cluster version to %v", ver.String())
plog.Noticef("set the initial cluster version to %v", ver.String())
}
c.version = ver
}
@ -365,12 +364,12 @@ func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool)
if isKeyNotFound(err) {
return members, removed
}
log.Panicf("get storeMembers should never fail: %v", err)
plog.Panicf("get storeMembers should never fail: %v", err)
}
for _, n := range e.Node.Nodes {
m, err := nodeToMember(n)
if err != nil {
log.Panicf("nodeToMember should never fail: %v", err)
plog.Panicf("nodeToMember should never fail: %v", err)
}
members[m.ID] = m
}
@ -380,7 +379,7 @@ func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool)
if isKeyNotFound(err) {
return members, removed
}
log.Panicf("get storeRemovedMembers should never fail: %v", err)
plog.Panicf("get storeRemovedMembers should never fail: %v", err)
}
for _, n := range e.Node.Nodes {
removed[mustParseMemberIDFromKey(n.Key)] = true
@ -394,7 +393,7 @@ func clusterVersionFromStore(st store.Store) *semver.Version {
if isKeyNotFound(err) {
return nil
}
log.Panicf("etcdserver: unexpected error (%v) when getting cluster version from store", err)
plog.Panicf("unexpected error (%v) when getting cluster version from store", err)
}
return semver.Must(semver.NewVersion(*e.Node.Value))
}

View File

@ -18,7 +18,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"sort"
"time"
@ -65,34 +64,34 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (
resp, err := cc.Get(u + "/members")
if err != nil {
if logerr {
log.Printf("etcdserver: could not get cluster response from %s: %v", u, err)
plog.Warningf("could not get cluster response from %s: %v", u, err)
}
continue
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
if logerr {
log.Printf("etcdserver: could not read the body of cluster response: %v", err)
plog.Warningf("could not read the body of cluster response: %v", err)
}
continue
}
var membs []*Member
if err := json.Unmarshal(b, &membs); err != nil {
if logerr {
log.Printf("etcdserver: could not unmarshal cluster response: %v", err)
plog.Warningf("could not unmarshal cluster response: %v", err)
}
continue
}
id, err := types.IDFromString(resp.Header.Get("X-Etcd-Cluster-ID"))
if err != nil {
if logerr {
log.Printf("etcdserver: could not parse the cluster ID from cluster res: %v", err)
plog.Warningf("could not parse the cluster ID from cluster res: %v", err)
}
continue
}
return newClusterFromMembers("", id, membs), nil
}
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
return nil, fmt.Errorf("could not retrieve cluster information from the given urls")
}
// getRemotePeerURLs returns peer urls of remote members in the cluster. The
@ -127,7 +126,7 @@ func getVersions(cl Cluster, local types.ID, tr *http.Transport) map[string]*ver
}
ver, err := getVersion(m, tr)
if err != nil {
log.Printf("etcdserver: cannot get the version of member %s (%v)", m.ID, err)
plog.Warningf("cannot get the version of member %s (%v)", m.ID, err)
vers[m.ID.String()] = nil
} else {
vers[m.ID.String()] = ver
@ -149,12 +148,12 @@ func decideClusterVersion(vers map[string]*version.Versions) *semver.Version {
}
v, err := semver.NewVersion(ver.Server)
if err != nil {
log.Printf("etcdserver: cannot understand the version of member %s (%v)", mid, err)
plog.Errorf("cannot understand the version of member %s (%v)", mid, err)
return nil
}
if lv.LessThan(*v) {
log.Printf("etcdserver: the etcd version %s is not up-to-date", lv.String())
log.Printf("etcdserver: member %s has a higher version %s", mid, ver)
plog.Warningf("the etcd version %s is not up-to-date", lv.String())
plog.Warningf("member %s has a higher version %s", mid, ver)
}
if cv == nil {
cv = v
@ -195,15 +194,15 @@ func isCompatibleWithVers(vers map[string]*version.Versions, local types.ID, min
}
clusterv, err := semver.NewVersion(v.Cluster)
if err != nil {
log.Printf("etcdserver: cannot understand the cluster version of member %s (%v)", id, err)
plog.Errorf("cannot understand the cluster version of member %s (%v)", id, err)
continue
}
if clusterv.LessThan(*minV) {
log.Printf("etcdserver: the running cluster version(%v) is lower than the minimal cluster version(%v) supported", clusterv.String(), minV.String())
plog.Warningf("the running cluster version(%v) is lower than the minimal cluster version(%v) supported", clusterv.String(), minV.String())
return false
}
if maxV.LessThan(*clusterv) {
log.Printf("etcdserver: the running cluster version(%v) is higher than the maximum cluster version(%v) supported", clusterv.String(), maxV.String())
plog.Warningf("the running cluster version(%v) is higher than the maximum cluster version(%v) supported", clusterv.String(), maxV.String())
return false
}
ok = true
@ -226,7 +225,7 @@ func getVersion(m *Member, tr *http.Transport) (*version.Versions, error) {
for _, u := range m.PeerURLs {
resp, err = cc.Get(u + "/version")
if err != nil {
log.Printf("etcdserver: failed to reach the peerURL(%s) of member %s (%v)", u, m.ID, err)
plog.Warningf("failed to reach the peerURL(%s) of member %s (%v)", u, m.ID, err)
continue
}
// etcd 2.0 does not have version endpoint on peer url.
@ -241,12 +240,12 @@ func getVersion(m *Member, tr *http.Transport) (*version.Versions, error) {
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Printf("etcdserver: failed to read out the response body from the peerURL(%s) of member %s (%v)", u, m.ID, err)
plog.Warningf("failed to read out the response body from the peerURL(%s) of member %s (%v)", u, m.ID, err)
continue
}
var vers version.Versions
if err := json.Unmarshal(b, &vers); err != nil {
log.Printf("etcdserver: failed to unmarshal the response body got from the peerURL(%s) of member %s (%v)", u, m.ID, err)
plog.Warningf("failed to unmarshal the response body got from the peerURL(%s) of member %s (%v)", u, m.ID, err)
continue
}
return &vers, nil

View File

@ -16,7 +16,6 @@ package etcdserver
import (
"fmt"
"log"
"net/http"
"path"
"reflect"
@ -114,25 +113,25 @@ func (c *ServerConfig) PrintWithInitial() { c.print(true) }
func (c *ServerConfig) Print() { c.print(false) }
func (c *ServerConfig) print(initial bool) {
log.Printf("etcdserver: name = %s", c.Name)
plog.Infof("name = %s", c.Name)
if c.ForceNewCluster {
log.Println("etcdserver: force new cluster")
plog.Infof("force new cluster")
}
log.Printf("etcdserver: data dir = %s", c.DataDir)
log.Printf("etcdserver: member dir = %s", c.MemberDir())
log.Printf("etcdserver: heartbeat = %dms", c.TickMs)
log.Printf("etcdserver: election = %dms", c.ElectionTicks*int(c.TickMs))
log.Printf("etcdserver: snapshot count = %d", c.SnapCount)
plog.Infof("data dir = %s", c.DataDir)
plog.Infof("member dir = %s", c.MemberDir())
plog.Infof("heartbeat = %dms", c.TickMs)
plog.Infof("election = %dms", c.ElectionTicks*int(c.TickMs))
plog.Infof("snapshot count = %d", c.SnapCount)
if len(c.DiscoveryURL) != 0 {
log.Printf("etcdserver: discovery URL= %s", c.DiscoveryURL)
plog.Infof("discovery URL= %s", c.DiscoveryURL)
if len(c.DiscoveryProxy) != 0 {
log.Printf("etcdserver: discovery proxy = %s", c.DiscoveryProxy)
plog.Infof("discovery proxy = %s", c.DiscoveryProxy)
}
}
log.Printf("etcdserver: advertise client URLs = %s", c.ClientURLs)
plog.Infof("advertise client URLs = %s", c.ClientURLs)
if initial {
log.Printf("etcdserver: initial advertise peer URLs = %s", c.PeerURLs)
log.Printf("etcdserver: initial cluster = %s", c.InitialPeerURLsMap)
plog.Infof("initial advertise peer URLs = %s", c.PeerURLs)
plog.Infof("initial cluster = %s", c.InitialPeerURLsMap)
}
}

View File

@ -19,7 +19,6 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"log"
"math/rand"
"path"
"sort"
@ -80,7 +79,7 @@ func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.T
// It will panic if there is no PeerURLs available in Member.
func (m *Member) PickPeerURL() string {
if len(m.PeerURLs) == 0 {
log.Panicf("member should always have some peer url")
plog.Panicf("member should always have some peer url")
}
return m.PeerURLs[rand.Intn(len(m.PeerURLs))]
}
@ -117,7 +116,7 @@ func MemberAttributesStorePath(id types.ID) string {
func mustParseMemberIDFromKey(key string) types.ID {
id, err := types.IDFromString(path.Base(key))
if err != nil {
log.Panicf("unexpected parse member id error: %v", err)
plog.Panicf("unexpected parse member id error: %v", err)
}
return id
}

View File

@ -15,7 +15,6 @@
package etcdserver
import (
"log"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
@ -66,17 +65,17 @@ func monitorFileDescriptor(done <-chan struct{}) {
for {
used, err := runtime.FDUsage()
if err != nil {
log.Printf("etcdserver: cannot monitor file descriptor usage (%v)", err)
plog.Errorf("cannot monitor file descriptor usage (%v)", err)
return
}
fileDescriptorUsed.Set(float64(used))
limit, err := runtime.FDLimit()
if err != nil {
log.Printf("etcdserver: cannot monitor file descriptor usage (%v)", err)
plog.Errorf("cannot monitor file descriptor usage (%v)", err)
return
}
if used >= limit/5*4 {
log.Printf("etcdserver: 80%% of the file descriptor limit is used [used = %d, limit = %d]", used, limit)
plog.Warningf("80%% of the file descriptor limit is used [used = %d, limit = %d]", used, limit)
}
select {
case <-ticker.C:

View File

@ -17,7 +17,6 @@ package etcdserver
import (
"encoding/json"
"expvar"
"log"
"os"
"sort"
"sync/atomic"
@ -149,13 +148,13 @@ func (r *raftNode) run() {
if !raft.IsEmptySnap(rd.Snapshot) {
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
log.Fatalf("etcdraft: save snapshot error: %v", err)
plog.Fatalf("raft save snapshot error: %v", err)
}
r.raftStorage.ApplySnapshot(rd.Snapshot)
log.Printf("etcdraft: applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
log.Fatalf("etcdraft: save state and entries error: %v", err)
plog.Fatalf("raft save state and entries error: %v", err)
}
r.raftStorage.Append(rd.Entries)
@ -179,7 +178,7 @@ func (r *raftNode) stop() {
r.Stop()
r.transport.Stop()
if err := r.storage.Close(); err != nil {
log.Panicf("etcdraft: close storage error: %v", err)
plog.Panicf("raft close storage error: %v", err)
}
close(r.done)
}
@ -205,21 +204,21 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r
},
)
if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
log.Fatalf("etcdserver create snapshot directory error: %v", err)
plog.Fatalf("create snapshot directory error: %v", err)
}
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
log.Fatalf("etcdserver: create wal error: %v", err)
plog.Fatalf("create wal error: %v", err)
}
peers := make([]raft.Peer, len(ids))
for i, id := range ids {
ctx, err := json.Marshal((*cl).Member(id))
if err != nil {
log.Panicf("marshal member should never fail: %v", err)
plog.Panicf("marshal member should never fail: %v", err)
}
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
}
id = member.ID
log.Printf("etcdserver: start member %s in cluster %s", id, cl.ID())
plog.Infof("starting member %s in cluster %s", id, cl.ID())
s = raft.NewMemoryStorage()
c := &raft.Config{
ID: uint64(id),
@ -241,7 +240,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust
}
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cid, st.Commit)
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := newCluster("")
cl.SetID(cid)
s := raft.NewMemoryStorage()
@ -273,7 +272,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
// discard the previously uncommitted entries
for i, ent := range ents {
if ent.Index > st.Commit {
log.Printf("etcdserver: discarding %d uncommitted WAL entries ", len(ents)-i)
plog.Infof("discarding %d uncommitted WAL entries ", len(ents)-i)
ents = ents[:i]
break
}
@ -286,13 +285,13 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
// force commit newly appended entries
err := w.Save(raftpb.HardState{}, toAppEnts)
if err != nil {
log.Fatalf("etcdserver: %v", err)
plog.Fatalf("%v", err)
}
if len(ents) != 0 {
st.Commit = ents[len(ents)-1].Index
}
log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := newCluster("")
cl.SetID(cid)
s := raft.NewMemoryStorage()
@ -338,7 +337,7 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
case raftpb.ConfChangeRemoveNode:
delete(ids, cc.NodeID)
default:
log.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
plog.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
}
}
sids := make(types.Uint64Slice, 0)
@ -383,7 +382,7 @@ func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raf
}
ctx, err := json.Marshal(m)
if err != nil {
log.Panicf("marshal member should never fail: %v", err)
plog.Panicf("marshal member should never fail: %v", err)
}
cc := &raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,

View File

@ -18,7 +18,6 @@ import (
"encoding/json"
"expvar"
"fmt"
"log"
"math/rand"
"net/http"
"path"
@ -27,6 +26,7 @@ import (
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
@ -66,6 +66,8 @@ const (
)
var (
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver")
storeMemberAttributeRegexp = regexp.MustCompile(path.Join(storeMembersPrefix, "[[:xdigit:]]{1,16}", attributesSuffix))
)
@ -256,7 +258,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
}
if cfg.ShouldDiscover() {
log.Printf("etcdserver: discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
}
snapshot, err := ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
@ -264,13 +266,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
}
if snapshot != nil {
if err := st.Recovery(snapshot.Data); err != nil {
log.Panicf("etcdserver: recovered store from snapshot error: %v", err)
plog.Panicf("recovered store from snapshot error: %v", err)
}
log.Printf("etcdserver: recovered store from snapshot at index %d", snapshot.Metadata.Index)
plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
}
cfg.Print()
if snapshot != nil {
log.Printf("etcdserver: loaded cluster information from store: %s", cl)
plog.Infof("loaded cluster information from store: %s", cl)
}
if !cfg.ForceNewCluster {
id, cl, n, s, w = restartNode(cfg, snapshot)
@ -344,16 +346,16 @@ func (s *EtcdServer) Start() {
// This function is just used for testing.
func (s *EtcdServer) start() {
if s.snapCount == 0 {
log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount)
plog.Infof("set snapshot count to default %d", DefaultSnapCount)
s.snapCount = DefaultSnapCount
}
s.w = wait.New()
s.done = make(chan struct{})
s.stop = make(chan struct{})
if s.ClusterVersion() != nil {
log.Printf("etcdserver: starting server... [version: %v, cluster version: %v]", version.Version, s.ClusterVersion())
plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, s.ClusterVersion())
} else {
log.Printf("etcdserver: starting server... [version: %v, cluster version: to_be_decided]", version.Version)
plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version)
}
// TODO: if this is an empty log, writes all peer infos
// into the first entry
@ -370,9 +372,9 @@ func (s *EtcdServer) purgeFile() {
}
select {
case e := <-werrc:
log.Fatalf("etcdserver: failed to purge wal file %v", e)
plog.Fatalf("failed to purge wal file %v", e)
case e := <-serrc:
log.Fatalf("etcdserver: failed to purge snap file %v", e)
plog.Fatalf("failed to purge snap file %v", e)
case <-s.done:
return
}
@ -386,7 +388,7 @@ func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler()
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.cluster.IsIDRemoved(types.ID(m.From)) {
log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String())
plog.Warningf("reject message from removed member %s", types.ID(m.From).String())
return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
}
if m.Type == raftpb.MsgApp {
@ -406,7 +408,7 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
func (s *EtcdServer) run() {
snap, err := s.r.raftStorage.Snapshot()
if err != nil {
log.Panicf("etcdserver: get snapshot from raft storage error: %v", err)
plog.Panicf("get snapshot from raft storage error: %v", err)
}
confState := snap.Metadata.ConfState
snapi := snap.Metadata.Index
@ -428,12 +430,12 @@ func (s *EtcdServer) run() {
// apply snapshot
if !raft.IsEmptySnap(apply.snapshot) {
if apply.snapshot.Metadata.Index <= appliedi {
log.Panicf("etcdserver: snapshot index [%d] should > appliedi[%d] + 1",
plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
apply.snapshot.Metadata.Index, appliedi)
}
if err := s.store.Recovery(apply.snapshot.Data); err != nil {
log.Panicf("recovery store error: %v", err)
plog.Panicf("recovery store error: %v", err)
}
s.cluster.Recover()
@ -449,14 +451,14 @@ func (s *EtcdServer) run() {
appliedi = apply.snapshot.Metadata.Index
snapi = appliedi
confState = apply.snapshot.Metadata.ConfState
log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)
plog.Infof("recovered from incoming snapshot at index %d", snapi)
}
// apply entries
if len(apply.entries) != 0 {
firsti := apply.entries[0].Index
if firsti > appliedi+1 {
log.Panicf("etcdserver: first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi)
plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi)
}
var ents []raftpb.Entry
if appliedi+1-firsti < uint64(len(apply.entries)) {
@ -474,13 +476,13 @@ func (s *EtcdServer) run() {
// trigger snapshot
if appliedi-snapi > s.snapCount {
log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi)
plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi)
s.snapshot(appliedi, confState)
snapi = appliedi
}
case err := <-s.errorc:
log.Printf("etcdserver: %s", err)
log.Printf("etcdserver: the data-dir used by this member must be removed.")
plog.Errorf("%s", err)
plog.Infof("the data-dir used by this member must be removed.")
return
case <-s.stop:
return
@ -650,7 +652,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error
return err
}
if x != nil {
log.Panicf("return type should always be error")
plog.Panicf("return type should always be error")
}
return nil
case <-ctx.Done():
@ -688,7 +690,7 @@ func (s *EtcdServer) sync(timeout time.Duration) {
func (s *EtcdServer) publish(retryInterval time.Duration) {
b, err := json.Marshal(s.attributes)
if err != nil {
log.Printf("etcdserver: json marshal error: %v", err)
plog.Panicf("json marshal error: %v", err)
return
}
req := pb.Request{
@ -703,13 +705,13 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
cancel()
switch err {
case nil:
log.Printf("etcdserver: published %+v to cluster %s", s.attributes, s.cluster.ID())
plog.Infof("published %+v to cluster %s", s.attributes, s.cluster.ID())
return
case ErrStopped:
log.Printf("etcdserver: aborting publish because server is stopped")
plog.Infof("aborting publish because server is stopped")
return
default:
log.Printf("etcdserver: publish error: %v", err)
plog.Errorf("publish error: %v", err)
}
}
}
@ -752,7 +754,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
shouldstop, err = s.applyConfChange(cc, confState)
s.w.Trigger(cc.ID, err)
default:
log.Panicf("entry type should be either EntryNormal or EntryConfChange")
plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
}
atomic.StoreUint64(&s.r.index, e.Index)
atomic.StoreUint64(&s.r.term, e.Term)
@ -792,7 +794,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
id := mustParseMemberIDFromKey(path.Dir(r.Path))
var attr Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
log.Panicf("unmarshal %s should never fail: %v", r.Val, err)
plog.Panicf("unmarshal %s should never fail: %v", r.Val, err)
}
s.cluster.UpdateAttributes(id, attr)
}
@ -832,17 +834,17 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
case raftpb.ConfChangeAddNode:
m := new(Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
log.Panicf("unmarshal member should never fail: %v", err)
plog.Panicf("unmarshal member should never fail: %v", err)
}
if cc.NodeID != uint64(m.ID) {
log.Panicf("nodeID should always be equal to member ID")
plog.Panicf("nodeID should always be equal to member ID")
}
s.cluster.AddMember(m)
if m.ID == s.id {
log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
plog.Noticef("added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
} else {
s.r.transport.AddPeer(m.ID, m.PeerURLs)
log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
plog.Noticef("added member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
}
case raftpb.ConfChangeRemoveNode:
id := types.ID(cc.NodeID)
@ -851,22 +853,22 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
return true, nil
} else {
s.r.transport.RemovePeer(id)
log.Printf("etcdserver: removed member %s from cluster %s", id, s.cluster.ID())
plog.Noticef("removed member %s from cluster %s", id, s.cluster.ID())
}
case raftpb.ConfChangeUpdateNode:
m := new(Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
log.Panicf("unmarshal member should never fail: %v", err)
plog.Panicf("unmarshal member should never fail: %v", err)
}
if cc.NodeID != uint64(m.ID) {
log.Panicf("nodeID should always be equal to member ID")
plog.Panicf("nodeID should always be equal to member ID")
}
s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
if m.ID == s.id {
log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
plog.Noticef("update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
} else {
s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
plog.Noticef("update member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
}
}
return false, nil
@ -881,7 +883,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
// TODO: current store will never fail to do a snapshot
// what should we do if the store might fail?
if err != nil {
log.Panicf("etcdserver: store save should never fail: %v", err)
plog.Panicf("store save should never fail: %v", err)
}
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
if err != nil {
@ -890,12 +892,12 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
if err == raft.ErrSnapOutOfDate {
return
}
log.Panicf("etcdserver: unexpected create snapshot error %v", err)
plog.Panicf("unexpected create snapshot error %v", err)
}
if err := s.r.storage.SaveSnap(snap); err != nil {
log.Fatalf("etcdserver: save snapshot error: %v", err)
plog.Fatalf("save snapshot error: %v", err)
}
log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index)
plog.Infof("saved snapshot at index %d", snap.Metadata.Index)
// keep some in memory log entries for slow followers.
compacti := uint64(1)
@ -909,9 +911,9 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
if err == raft.ErrCompacted {
return
}
log.Panicf("etcdserver: unexpected compaction error %v", err)
plog.Panicf("unexpected compaction error %v", err)
}
log.Printf("etcdserver: compacted raft log at %d", compacti)
plog.Info("compacted raft log at %d", compacti)
}()
}
@ -974,9 +976,9 @@ func (s *EtcdServer) monitorVersions() {
func (s *EtcdServer) updateClusterVersion(ver string) {
if s.cluster.Version() == nil {
log.Printf("etcdsever: setting up the initial cluster version to %v", ver)
plog.Infof("setting up the initial cluster version to %v", ver)
} else {
log.Printf("etcdsever: updating the cluster version from %v to %v", s.cluster.Version(), ver)
plog.Infof("updating the cluster version from %v to %v", s.cluster.Version(), ver)
}
req := pb.Request{
Method: "PUT",
@ -990,9 +992,9 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
case nil:
return
case ErrStopped:
log.Printf("etcdserver: aborting update cluster version because server is stopped")
plog.Infof("aborting update cluster version because server is stopped")
return
default:
log.Printf("etcdserver: error updating cluster version (%v)", err)
plog.Errorf("error updating cluster version (%v)", err)
}
}

View File

@ -17,10 +17,7 @@ package etcdserver
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"path"
"reflect"
"strconv"
@ -1022,8 +1019,6 @@ func TestPublishStopped(t *testing.T) {
// TestPublishRetry tests that publish will keep retry until success.
func TestPublishRetry(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stderr)
n := &nodeRecorder{}
srv := &EtcdServer{
r: raftNode{Node: n},

View File

@ -16,7 +16,6 @@ package etcdserver
import (
"io"
"log"
"os"
"path"
@ -81,18 +80,18 @@ func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID,
repaired := false
for {
if w, err = wal.Open(waldir, snap); err != nil {
log.Fatalf("etcdserver: open wal error: %v", err)
plog.Fatalf("open wal error: %v", err)
}
if wmetadata, st, ents, err = w.ReadAll(); err != nil {
w.Close()
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
log.Fatalf("etcdserver: read wal error (%v) and cannot be repaired", err)
plog.Fatalf("read wal error (%v) and cannot be repaired", err)
}
if !wal.Repair(waldir) {
log.Fatalf("etcdserver: WAL error (%v) cannot be repaired", err)
plog.Fatalf("WAL error (%v) cannot be repaired", err)
} else {
log.Printf("etcdserver: repaired WAL error (%v)", err)
plog.Infof("repaired WAL error (%v)", err)
repaired = true
}
continue
@ -111,10 +110,10 @@ func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID,
func upgradeDataDir(baseDataDir string, name string, ver version.DataDirVersion) error {
switch ver {
case version.DataDir0_4:
log.Print("etcdserver: converting v0.4 log to v2.0")
plog.Infof("converting v0.4 log to v2.0")
err := migrate.Migrate4To2(baseDataDir, name)
if err != nil {
log.Fatalf("etcdserver: failed migrating data-dir: %v", err)
plog.Fatalf("failed to migrate data-dir (%v)", err)
return err
}
fallthrough