diff --git a/etcd/etcd.go b/etcd/etcd.go index 2867e38ec..e1262f3d0 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -480,12 +480,7 @@ func (s *Server) apply(ents []raft.Entry) { func (s *Server) send(msgs []raft.Message) { for i := range msgs { - data, err := json.Marshal(msgs[i]) - if err != nil { - // todo(xiangli): error handling - log.Fatal(err) - } - if err = s.peerHub.send(msgs[i].To, data); err != nil { + if err := s.peerHub.send(msgs[i]); err != nil { log.Println("send:", err) } } diff --git a/etcd/peer_hub.go b/etcd/peer_hub.go index 28436d792..0b2b54722 100644 --- a/etcd/peer_hub.go +++ b/etcd/peer_hub.go @@ -1,6 +1,7 @@ package etcd import ( + "encoding/json" "errors" "fmt" "io/ioutil" @@ -8,6 +9,8 @@ import ( "net/url" "path" "sync" + + "github.com/coreos/etcd/raft" ) var ( @@ -19,10 +22,11 @@ type peerGetter interface { } type peerHub struct { - mu sync.RWMutex - seeds map[string]bool - peers map[int64]*peer - c *http.Client + mu sync.RWMutex + stopped bool + seeds map[string]bool + peers map[int64]*peer + c *http.Client } func newPeerHub(seeds []string, c *http.Client) *peerHub { @@ -38,6 +42,9 @@ func newPeerHub(seeds []string, c *http.Client) *peerHub { } func (h *peerHub) stop() { + h.mu.Lock() + defer h.mu.Unlock() + h.stopped = true for _, p := range h.peers { p.stop() } @@ -48,6 +55,9 @@ func (h *peerHub) stop() { func (h *peerHub) peer(id int64) (*peer, error) { h.mu.Lock() defer h.mu.Unlock() + if h.stopped { + return nil, fmt.Errorf("peerHub stopped") + } if p, ok := h.peers[id]; ok { return p, nil } @@ -63,12 +73,19 @@ func (h *peerHub) add(id int64, rawurl string) (*peer, error) { h.mu.Lock() defer h.mu.Unlock() + if h.stopped { + return nil, fmt.Errorf("peerHub stopped") + } h.peers[id] = newPeer(u.String(), h.c) return h.peers[id], nil } -func (h *peerHub) send(nodeId int64, data []byte) error { - if p, err := h.fetch(nodeId); err == nil { +func (h *peerHub) send(msg raft.Message) error { + if p, err := h.fetch(msg.To); err == nil { + data, err := json.Marshal(msg) + if err != nil { + return err + } return p.send(data) } return errUnknownPeer