mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcd: transporter->rafthandler
This commit is contained in:
parent
d2a553f6c4
commit
b8b5734689
14
etcd/etcd.go
14
etcd/etcd.go
@ -65,7 +65,7 @@ type Server struct {
|
||||
tickDuration time.Duration
|
||||
|
||||
client *v2client
|
||||
t *transporter
|
||||
rh *raftHandler
|
||||
node *v2Raft
|
||||
store.Store
|
||||
|
||||
@ -131,7 +131,7 @@ func New(c *config.Config, id int64) *Server {
|
||||
modeC: make(chan int, 10),
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
s.t = newTransporter(s.peerHub)
|
||||
s.rh = newRaftHandler(s.peerHub)
|
||||
|
||||
for _, seed := range c.Peers {
|
||||
s.nodes[seed] = true
|
||||
@ -157,7 +157,7 @@ func (s *Server) SetTick(d time.Duration) {
|
||||
}
|
||||
|
||||
func (s *Server) RaftHandler() http.Handler {
|
||||
return s.t
|
||||
return s.rh
|
||||
}
|
||||
|
||||
func (s *Server) ClusterConfig() *config.ClusterConfig {
|
||||
@ -184,7 +184,7 @@ func (s *Server) Stop() {
|
||||
}
|
||||
s.mode = stop
|
||||
|
||||
s.t.stop()
|
||||
s.rh.stop()
|
||||
s.client.CloseConnections()
|
||||
s.peerHub.stop()
|
||||
close(s.stop)
|
||||
@ -324,7 +324,7 @@ func (s *Server) initParticipant() {
|
||||
s.proposal = make(chan v2Proposal, maxBufferedProposal)
|
||||
s.addNodeC = make(chan raft.Config, 1)
|
||||
s.removeNodeC = make(chan raft.Config, 1)
|
||||
s.t.start()
|
||||
s.rh.start()
|
||||
}
|
||||
|
||||
func (s *Server) initStandby() {
|
||||
@ -358,10 +358,10 @@ func (s *Server) run() {
|
||||
func (s *Server) runParticipant() {
|
||||
defer func() {
|
||||
s.node.StopProposalWaiters()
|
||||
s.t.stop()
|
||||
s.rh.stop()
|
||||
}()
|
||||
node := s.node
|
||||
recv := s.t.recv
|
||||
recv := s.rh.recv
|
||||
ticker := time.NewTicker(s.tickDuration)
|
||||
v2SyncTicker := time.NewTicker(time.Millisecond * 500)
|
||||
|
||||
|
@ -375,8 +375,8 @@ func initTestServer(c *config.Config, id int64, tls bool) (e *Server, h *httptes
|
||||
e.SetTick(time.Millisecond * 5)
|
||||
m := http.NewServeMux()
|
||||
m.Handle("/", e)
|
||||
m.Handle("/raft", e.t)
|
||||
m.Handle("/raft/", e.t)
|
||||
m.Handle("/raft", e.RaftHandler())
|
||||
m.Handle("/raft/", e.RaftHandler())
|
||||
|
||||
if tls {
|
||||
h = httptest.NewTLSServer(m)
|
||||
|
@ -2,7 +2,6 @@ package etcd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
@ -11,7 +10,7 @@ import (
|
||||
"github.com/coreos/etcd/raft"
|
||||
)
|
||||
|
||||
type transporter struct {
|
||||
type raftHandler struct {
|
||||
mu sync.RWMutex
|
||||
serving bool
|
||||
|
||||
@ -21,33 +20,33 @@ type transporter struct {
|
||||
*http.ServeMux
|
||||
}
|
||||
|
||||
func newTransporter(p peerGetter) *transporter {
|
||||
t := &transporter{
|
||||
func newRaftHandler(p peerGetter) *raftHandler {
|
||||
h := &raftHandler{
|
||||
recv: make(chan *raft.Message, 512),
|
||||
peerGetter: p,
|
||||
}
|
||||
t.ServeMux = http.NewServeMux()
|
||||
t.ServeMux.HandleFunc("/raft/cfg/", t.serveCfg)
|
||||
t.ServeMux.HandleFunc("/raft", t.serveRaft)
|
||||
return t
|
||||
h.ServeMux = http.NewServeMux()
|
||||
h.ServeMux.HandleFunc("/raft/cfg/", h.serveCfg)
|
||||
h.ServeMux.HandleFunc("/raft", h.serveRaft)
|
||||
return h
|
||||
}
|
||||
|
||||
func (t *transporter) start() {
|
||||
t.mu.Lock()
|
||||
t.serving = true
|
||||
t.mu.Unlock()
|
||||
func (h *raftHandler) start() {
|
||||
h.mu.Lock()
|
||||
h.serving = true
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (t *transporter) stop() {
|
||||
t.mu.Lock()
|
||||
t.serving = false
|
||||
t.mu.Unlock()
|
||||
func (h *raftHandler) stop() {
|
||||
h.mu.Lock()
|
||||
h.serving = false
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (t *transporter) serveRaft(w http.ResponseWriter, r *http.Request) {
|
||||
t.mu.RLock()
|
||||
serving := t.serving
|
||||
t.mu.RUnlock()
|
||||
func (h *raftHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
|
||||
h.mu.RLock()
|
||||
serving := h.serving
|
||||
h.mu.RUnlock()
|
||||
if !serving {
|
||||
http.Error(w, "404 page not found", http.StatusNotFound)
|
||||
return
|
||||
@ -60,7 +59,7 @@ func (t *transporter) serveRaft(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
select {
|
||||
case t.recv <- msg:
|
||||
case h.recv <- msg:
|
||||
default:
|
||||
log.Println("drop")
|
||||
// drop the incoming package at network layer if the upper layer
|
||||
@ -70,10 +69,10 @@ func (t *transporter) serveRaft(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
func (t *transporter) serveCfg(w http.ResponseWriter, r *http.Request) {
|
||||
t.mu.RLock()
|
||||
serving := t.serving
|
||||
t.mu.RUnlock()
|
||||
func (h *raftHandler) serveCfg(w http.ResponseWriter, r *http.Request) {
|
||||
h.mu.RLock()
|
||||
serving := h.serving
|
||||
h.mu.RUnlock()
|
||||
if !serving {
|
||||
http.Error(w, "404 page not found", http.StatusNotFound)
|
||||
return
|
||||
@ -84,7 +83,7 @@ func (t *transporter) serveCfg(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
p, err := t.peerGetter.peer(id)
|
||||
p, err := h.peerGetter.peer(id)
|
||||
if err == nil {
|
||||
w.Write([]byte(p.url))
|
||||
return
|
||||
|
Loading…
x
Reference in New Issue
Block a user