From e7bc7becf396c1e43bc71f563a983a5cb5cdf482 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 23 Jul 2014 16:37:59 -0700 Subject: [PATCH] server: add first level logging --- etcd/discovery.go | 7 +++---- etcd/etcd.go | 14 +++++++++++-- etcd/participant.go | 45 ++++++++++++++++++++++-------------------- etcd/peer.go | 2 +- etcd/raft_handler.go | 4 ++-- etcd/standby.go | 5 +++-- etcd/v2_apply.go | 4 ++-- etcd/v2_client.go | 3 --- etcd/v2_http.go | 6 ++---- etcd/v2_http_delete.go | 3 --- etcd/v2_http_post.go | 2 -- etcd/v2_http_put.go | 5 ----- 12 files changed, 49 insertions(+), 51 deletions(-) diff --git a/etcd/discovery.go b/etcd/discovery.go index e73fc3e00..9c92321db 100644 --- a/etcd/discovery.go +++ b/etcd/discovery.go @@ -53,7 +53,6 @@ func newDiscoverer(u *url.URL, name, raftPubAddr string) *discoverer { u.Path = "" // Connect to a scheme://host not a full URL with path - log.Printf("Discovery via %s using prefix %s.\n", u.String(), d.prefix) d.client = etcd.NewClient([]string{u.String()}) if !strings.HasPrefix(oldPath, "/v2/keys") { @@ -63,6 +62,8 @@ func newDiscoverer(u *url.URL, name, raftPubAddr string) *discoverer { } func (d *discoverer) discover() ([]string, error) { + log.Printf("discoverer name=%s target=\"%q\" prefix=%s\n", d.name, d.client.GetCluster(), d.prefix) + if _, err := d.client.Set(path.Join(d.prefix, d.name), d.addr, defaultTTL); err != nil { return nil, fmt.Errorf("discovery service error: %v", err) } @@ -79,7 +80,6 @@ func (d *discoverer) discover() ([]string, error) { // If we got a response then the CAS was successful, we are leader if resp != nil && resp.Node.Value == startedState { // We are the leader, we have no peers - log.Println("Discovery _state was empty, so this machine is the initial leader.") return nil, nil } @@ -111,7 +111,6 @@ func (d *discoverer) findPeers() (peers []string, err error) { return nil, errors.New("Discovery found an initialized cluster but no reachable peers are registered.") } - log.Printf("Discovery found peers %v\n", peers) return } @@ -122,7 +121,7 @@ func (d *discoverer) heartbeat(stopc <-chan struct{}) { defer ticker.Stop() for { if _, err := d.client.Set(path.Join(d.prefix, d.name), d.addr, defaultTTL); err != nil { - log.Println("Discovery heartbeat failed: %v", err) + log.Println("discoverer heartbeatErr=\"%v\"", err) } select { diff --git a/etcd/etcd.go b/etcd/etcd.go index a166a326a..37732a448 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -51,11 +51,12 @@ type Server struct { stopped bool mu sync.Mutex stopc chan struct{} + log *log.Logger } func New(c *config.Config) *Server { if err := c.Sanitize(); err != nil { - log.Fatalf("failed sanitizing configuration: %v", err) + log.Fatalf("server.new sanitizeErr=\"%v\"\n", err) } tc := &tls.Config{ @@ -65,7 +66,7 @@ func New(c *config.Config) *Server { if c.PeerTLSInfo().Scheme() == "https" { tc, err = c.PeerTLSInfo().ClientConfig() if err != nil { - log.Fatal("failed to create raft transporter tls:", err) + log.Fatalf("server.new ClientConfigErr=\"%v\"\n", err) } } @@ -87,12 +88,14 @@ func New(c *config.Config) *Server { stopc: make(chan struct{}), } + log.Printf("server.new id=%x raftPubAddr=%s\n", s.id, s.raftPubAddr) return s } func (s *Server) SetTick(tick time.Duration) { s.tickDuration = tick + log.Printf("server.setTick id=%x tick=%q\n", s.id, s.tickDuration) } // Stop stops the server elegently. @@ -112,6 +115,7 @@ func (s *Server) Stop() { <-s.stopc s.client.CloseConnections() s.peerHub.stop() + log.Printf("server.stop id=%x\n", s.id) } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -151,8 +155,10 @@ func (s *Server) Run() error { if seeds, err = d.discover(); err != nil { return err } + log.Printf("server.run id=%x source=-discovery seeds=\"%v\"\n", s.id, seeds) } else { seeds = s.config.Peers + log.Printf("server.run id=%x source=-peers seeds=\"%v\"\n", s.id, seeds) } s.peerHub.setSeeds(seeds) @@ -170,6 +176,7 @@ func (s *Server) Run() error { go d.heartbeat(dStopc) } s.mode.Set(participantMode) + log.Printf("server.run id=%x mode=participantMode\n", s.id) s.mu.Unlock() next = s.p.run() if d != nil { @@ -178,10 +185,12 @@ func (s *Server) Run() error { case standbyMode: s.s = newStandby(s.client, s.peerHub) s.mode.Set(standbyMode) + log.Printf("server.run id=%x mode=standbyMode\n", s.id) s.mu.Unlock() next = s.s.run() case stopMode: s.mode.Set(stopMode) + log.Printf("server.run id=%x mode=stopMode\n", s.id) s.mu.Unlock() s.stopc <- struct{}{} return nil @@ -194,5 +203,6 @@ func (s *Server) Run() error { // setId sets the id for the participant. This should only be used for testing. func (s *Server) setId(id int64) { + log.Printf("server.setId id=%x oldId=%x\n", id, s.id) s.id = id } diff --git a/etcd/participant.go b/etcd/participant.go index d393b0d8d..6f8db7f55 100644 --- a/etcd/participant.go +++ b/etcd/participant.go @@ -121,13 +121,13 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, client *v2clie func (p *participant) run() int64 { seeds := p.peerHub.getSeeds() if len(seeds) == 0 { - log.Println("starting a bootstrap node") + log.Printf("participant.run id=%x action=bootstrap\n", p.id) p.node.Campaign() p.node.InitCluster(genId()) p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr)) p.apply(p.node.Next()) } else { - log.Println("joining cluster via peers", seeds) + log.Printf("participant.run id=%x action=join seeds=\"%v\"\n", p.id, seeds) p.join() } @@ -167,14 +167,14 @@ func (p *participant) run() int64 { case <-v2SyncTicker.C: node.Sync() case <-p.stopc: - log.Printf("Participant %x stopped\n", p.id) + log.Printf("participant.stop id=%x\n", p.id) return stopMode } p.apply(node.Next()) p.send(node.Msgs()) if node.IsRemoved() { - log.Printf("Participant %x return\n", p.id) p.stop() + log.Printf("participant.end id=%x\n", p.id) return standbyMode } } @@ -195,6 +195,7 @@ func (p *participant) raftHandler() http.Handler { } func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error { + log.Printf("participant.add id=%x nodeId=%x raftPubAddr=%s pubAddr=%s\n", p.id, id, raftPubAddr, pubAddr) pp := path.Join(v2machineKVPrefix, fmt.Sprint(id)) _, err := p.Get(pp, false, false) @@ -202,12 +203,13 @@ func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error { return nil } if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound { + log.Printf("participant.add id=%x getErr=\"%v\"\n", p.id, err) return err } w, err := p.Watch(pp, true, false, 0) if err != nil { - log.Println("add error:", err) + log.Printf("participant.add id=%x watchErr=\"%v\"\n", p.id, err) return tmpErr } @@ -215,7 +217,7 @@ func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error { case p.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}: default: w.Remove() - log.Println("unable to send out addNode proposal") + log.Printf("participant.add id=%x proposeErr=\"unable to send out addNode proposal\"\n", p.id) return tmpErr } @@ -224,11 +226,11 @@ func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error { if v.Action == store.Set { return nil } - log.Println("add error: action =", v.Action) + log.Printf("participant.add id=%x watchErr=\"unexpected action\" action=%s\n", p.id, v.Action) return tmpErr case <-time.After(6 * defaultHeartbeat * p.tickDuration): w.Remove() - log.Println("add error: wait timeout") + log.Printf("participant.add id=%x watchErr=timeout\n", p.id) return tmpErr case <-p.stopc: return stopErr @@ -236,6 +238,7 @@ func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error { } func (p *participant) remove(id int64) error { + log.Printf("participant.remove id=%x nodeId=%x\n", p.id, id) pp := path.Join(v2machineKVPrefix, fmt.Sprint(id)) v, err := p.Get(pp, false, false) @@ -246,7 +249,7 @@ func (p *participant) remove(id int64) error { select { case p.removeNodeC <- raft.Config{NodeId: id}: default: - log.Println("unable to send out removeNode proposal") + log.Printf("participant.remove id=%x proposeErr=\"unable to send out removeNode proposal\"\n", p.id) return tmpErr } @@ -254,7 +257,7 @@ func (p *participant) remove(id int64) error { // removal target is self w, err := p.Watch(pp, true, false, v.Index()+1) if err != nil { - log.Println("remove error:", err) + log.Printf("participant.remove id=%x watchErr=\"%v\"\n", p.id, err) return tmpErr } @@ -263,11 +266,11 @@ func (p *participant) remove(id int64) error { if v.Action == store.Delete { return nil } - log.Println("remove error: action =", v.Action) + log.Printf("participant.remove id=%x watchErr=\"unexpected action\" action=%s\n", p.id, v.Action) return tmpErr case <-time.After(6 * defaultHeartbeat * p.tickDuration): w.Remove() - log.Println("remove error: wait timeout") + log.Printf("participant.remove id=%x watchErr=timeout\n", p.id) return tmpErr case <-p.stopc: return stopErr @@ -286,35 +289,36 @@ func (p *participant) apply(ents []raft.Entry) { p.v2apply(offset+int64(i), ent) case raft.ClusterInit: p.clusterId = p.node.ClusterId() + log.Printf("participant.cluster.setId id=%x clusterId=%x\n", p.id, p.clusterId) case raft.AddNode: cfg := new(raft.Config) if err := json.Unmarshal(ent.Data, cfg); err != nil { - log.Println(err) + log.Printf("participant.cluster.addNode id=%x UnmarshalErr=\"%v\"\n", p.id, err) break } peer, err := p.peerHub.add(cfg.NodeId, cfg.Addr) if err != nil { - log.Println(err) + log.Printf("participant.cluster.addNode id=%x peerAddErr=\"%v\"\n", p.id, err) break } peer.participate() - log.Printf("Add Node %x %v %v\n", cfg.NodeId, cfg.Addr, string(cfg.Context)) pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId)) p.Store.Set(pp, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent) + log.Printf("participant.cluster.addNode id=%x nodeId=%x addr=%s context=%s\n", p.id, cfg.NodeId, cfg.Addr, cfg.Context) case raft.RemoveNode: cfg := new(raft.Config) if err := json.Unmarshal(ent.Data, cfg); err != nil { - log.Println(err) + log.Printf("participant.cluster.removeNode id=%x UnmarshalErr=\"%v\"\n", p.id, err) break } - log.Printf("Remove Node %x\n", cfg.NodeId) peer, err := p.peerHub.peer(cfg.NodeId) if err != nil { - log.Fatal("cannot get the added peer:", err) + log.Fatal("participant.apply getPeerErr=\"%v\"", err) } peer.idle() pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId)) p.Store.Delete(pp, false, false) + log.Printf("participant.cluster.removeNode id=%x nodeId=%x\n", p.id, cfg.NodeId) default: panic("unimplemented") } @@ -324,7 +328,7 @@ func (p *participant) apply(ents []raft.Entry) { func (p *participant) send(msgs []raft.Message) { for i := range msgs { if err := p.peerHub.send(msgs[i]); err != nil { - log.Println("send:", err) + log.Printf("participant.send id=%x err=\"%v\"\n", p.id, err) } } } @@ -342,12 +346,11 @@ func (p *participant) join() { if err := p.client.AddMachine(seed, fmt.Sprint(p.id), info); err == nil { return } else { - log.Println(err) + log.Printf("participant.join id=%x addMachineErr=\"%v\"\n", p.id, err) } } time.Sleep(100 * time.Millisecond) } - log.Println("fail to join the cluster") } func genId() int64 { diff --git a/etcd/peer.go b/etcd/peer.go index b007843c1..be5fa4eeb 100644 --- a/etcd/peer.go +++ b/etcd/peer.go @@ -122,7 +122,7 @@ func (p *peer) post(d []byte) { buf := bytes.NewBuffer(d) resp, err := p.c.Post(p.url, "application/octet-stream", buf) if err != nil { - log.Println("post:", err) + log.Println("peer.post url=%s err=\"%v\"", p.url, err) return } resp.Body.Close() diff --git a/etcd/raft_handler.go b/etcd/raft_handler.go index 9cf57e1f5..7f12dba8e 100644 --- a/etcd/raft_handler.go +++ b/etcd/raft_handler.go @@ -74,14 +74,14 @@ func (h *raftHandler) serveRaft(w http.ResponseWriter, r *http.Request) { msg := new(raft.Message) if err := json.NewDecoder(r.Body).Decode(msg); err != nil { - log.Println(err) + log.Printf("raftHandler.serve decodeErr=\"%v\"\n", err) return } select { case h.recv <- msg: default: - log.Println("drop") + log.Printf("raftHandler.serve pushErr=\"recv channel is full\"\n") // drop the incoming package at network layer if the upper layer // cannot consume them in time. // TODO(xiangli): not return 200. diff --git a/etcd/standby.go b/etcd/standby.go index ca0a51c67..453ab988c 100644 --- a/etcd/standby.go +++ b/etcd/standby.go @@ -69,12 +69,12 @@ func (s *standby) run() int64 { select { case <-time.After(syncDuration): case <-s.stopc: - log.Printf("Standby stopped\n") + log.Printf("standby.stop\n") return stopMode } if update, err := s.syncCluster(nodes); err != nil { - log.Println("standby sync:", err) + log.Println("standby.run syncErr=\"%v\"", err) continue } else { nodes = update @@ -83,6 +83,7 @@ func (s *standby) run() int64 { if s.clusterConf.ActiveSize <= len(nodes) { continue } + log.Printf("standby.end\n") return participantMode } } diff --git a/etcd/v2_apply.go b/etcd/v2_apply.go index e614d61c5..4d4668f93 100644 --- a/etcd/v2_apply.go +++ b/etcd/v2_apply.go @@ -32,7 +32,7 @@ func (p *participant) v2apply(index int64, ent raft.Entry) { cmd := new(cmd) if err := json.Unmarshal(ent.Data, cmd); err != nil { - log.Println("v2apply.decode:", err) + log.Printf("participant.store.apply id=%x decodeErr=\"%v\"\n", p.id, err) return } @@ -53,7 +53,7 @@ func (p *participant) v2apply(index int64, ent raft.Entry) { p.Store.DeleteExpiredKeys(cmd.Time) return default: - log.Println("unexpected command type:", cmd.Type) + log.Printf("participant.store.apply id=%x err=\"unexpected command type %s\"\n", p.id, cmd.Type) } if ent.Term > p.node.term { diff --git a/etcd/v2_client.go b/etcd/v2_client.go index 583ffefb8..26cca09d2 100644 --- a/etcd/v2_client.go +++ b/etcd/v2_client.go @@ -24,7 +24,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "net/http" "strconv" "strings" @@ -155,7 +154,6 @@ func (c *v2client) AddMachine(url string, name string, info *context) *etcdErr.E b, _ := json.Marshal(info) url = url + "/v2/admin/machines/" + name - log.Printf("Send Join Request to %s", url) resp, err := c.put(url, b) if err != nil { return clientError(err) @@ -182,7 +180,6 @@ func (c *v2client) readErrorBody(body io.ReadCloser) *etcdErr.Error { func (c *v2client) readJSONBody(body io.ReadCloser, val interface{}) *etcdErr.Error { if err := json.NewDecoder(body).Decode(val); err != nil { - log.Printf("Error parsing join response: %v", err) return clientError(err) } c.readBody(body) diff --git a/etcd/v2_http.go b/etcd/v2_http.go index f83d11b95..4f3b19418 100644 --- a/etcd/v2_http.go +++ b/etcd/v2_http.go @@ -98,7 +98,7 @@ func (eh handlerErr) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - log.Println("http error", err) + log.Printf("HTTP.serve: req=%s err=\"%v\"\n", r.URL, err) http.Error(w, "Internal Server Error", http.StatusInternalServerError) } @@ -118,7 +118,6 @@ func (w *HEADResponseWriter) Write([]byte) (int, error) { func (p *participant) redirect(w http.ResponseWriter, r *http.Request, id int64) error { e, err := p.Store.Get(fmt.Sprintf("%v/%d", v2machineKVPrefix, p.node.Leader()), false, false) if err != nil { - log.Println("redirect cannot find node", id) return fmt.Errorf("redirect cannot find node %d", id) } @@ -129,7 +128,6 @@ func (p *participant) redirect(w http.ResponseWriter, r *http.Request, id int64) redirectAddr, err := buildRedirectURL(m["etcd"][0], r.URL) if err != nil { - log.Println("redirect cannot build new url:", err) return err } @@ -140,7 +138,7 @@ func (p *participant) redirect(w http.ResponseWriter, r *http.Request, id int64) func buildRedirectURL(redirectAddr string, originalURL *url.URL) (string, error) { redirectURL, err := url.Parse(redirectAddr) if err != nil { - return "", fmt.Errorf("redirect cannot parse url: %v", err) + return "", fmt.Errorf("cannot parse url: %v", err) } redirectURL.Path = originalURL.Path diff --git a/etcd/v2_http_delete.go b/etcd/v2_http_delete.go index 703d9c333..a8efc365a 100644 --- a/etcd/v2_http_delete.go +++ b/etcd/v2_http_delete.go @@ -17,7 +17,6 @@ limitations under the License. package etcd import ( - "log" "net/http" "strconv" @@ -70,7 +69,6 @@ func (p *participant) serveDelete(w http.ResponseWriter, req *http.Request, key p.handleRet(w, ret) return nil } - log.Println("delete:", err) return err } @@ -80,6 +78,5 @@ func (p *participant) serveCAD(w http.ResponseWriter, req *http.Request, key str p.handleRet(w, ret) return nil } - log.Println("cad:", err) return err } diff --git a/etcd/v2_http_post.go b/etcd/v2_http_post.go index 30c1e514b..562769755 100644 --- a/etcd/v2_http_post.go +++ b/etcd/v2_http_post.go @@ -17,7 +17,6 @@ limitations under the License. package etcd import ( - "log" "net/http" etcdErr "github.com/coreos/etcd/error" @@ -43,6 +42,5 @@ func (p *participant) PostHandler(w http.ResponseWriter, req *http.Request) erro p.handleRet(w, ret) return nil } - log.Println("unique:", err) return err } diff --git a/etcd/v2_http_put.go b/etcd/v2_http_put.go index cb57096c8..dd34086ef 100644 --- a/etcd/v2_http_put.go +++ b/etcd/v2_http_put.go @@ -19,7 +19,6 @@ package etcd import ( "encoding/json" "fmt" - "log" "net/http" "net/url" "strconv" @@ -115,7 +114,6 @@ func (p *participant) serveSet(w http.ResponseWriter, req *http.Request, key str p.handleRet(w, ret) return nil } - log.Println("set:", err) return err } @@ -125,7 +123,6 @@ func (p *participant) serveCreate(w http.ResponseWriter, req *http.Request, key p.handleRet(w, ret) return nil } - log.Println("create:", err) return err } @@ -139,7 +136,6 @@ func (p *participant) serveUpdate(w http.ResponseWriter, req *http.Request, key, p.handleRet(w, ret) return nil } - log.Println("update:", err) return err } @@ -149,7 +145,6 @@ func (p *participant) serveCAS(w http.ResponseWriter, req *http.Request, key, va p.handleRet(w, ret) return nil } - log.Println("update:", err) return err }