mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: use status for transporter
This commit is contained in:
parent
429b9487f7
commit
a274e5b192
@ -17,13 +17,18 @@ import (
|
|||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
serving int = iota
|
||||||
|
stopped
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errUnknownNode = errors.New("unknown node")
|
errUnknownNode = errors.New("unknown node")
|
||||||
)
|
)
|
||||||
|
|
||||||
type transporter struct {
|
type transporter struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
stopped bool
|
status int
|
||||||
urls map[int64]string
|
urls map[int64]string
|
||||||
|
|
||||||
recv chan *raft.Message
|
recv chan *raft.Message
|
||||||
@ -50,13 +55,13 @@ func newTransporter(tc *tls.Config) *transporter {
|
|||||||
|
|
||||||
func (t *transporter) start() {
|
func (t *transporter) start() {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
t.stopped = false
|
t.status = serving
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *transporter) stop() {
|
func (t *transporter) stop() {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
t.stopped = true
|
t.status = stopped
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,7 +96,7 @@ func (t *transporter) sendTo(nodeId int64, data []byte) error {
|
|||||||
|
|
||||||
func (t *transporter) send(addr string, data []byte) error {
|
func (t *transporter) send(addr string, data []byte) error {
|
||||||
t.mu.RLock()
|
t.mu.RLock()
|
||||||
if t.stopped {
|
if t.status == stopped {
|
||||||
t.mu.RUnlock()
|
t.mu.RUnlock()
|
||||||
return fmt.Errorf("transporter stopped")
|
return fmt.Errorf("transporter stopped")
|
||||||
}
|
}
|
||||||
@ -134,12 +139,12 @@ func (t *transporter) fetchAddr(seedurl string, id int64) error {
|
|||||||
|
|
||||||
func (t *transporter) serveRaft(w http.ResponseWriter, r *http.Request) {
|
func (t *transporter) serveRaft(w http.ResponseWriter, r *http.Request) {
|
||||||
t.mu.RLock()
|
t.mu.RLock()
|
||||||
if t.stopped {
|
status := t.status
|
||||||
t.mu.RUnlock()
|
t.mu.RUnlock()
|
||||||
|
if status == stopped {
|
||||||
http.Error(w, "404 page not found", http.StatusNotFound)
|
http.Error(w, "404 page not found", http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t.mu.RUnlock()
|
|
||||||
|
|
||||||
msg := new(raft.Message)
|
msg := new(raft.Message)
|
||||||
if err := json.NewDecoder(r.Body).Decode(msg); err != nil {
|
if err := json.NewDecoder(r.Body).Decode(msg); err != nil {
|
||||||
@ -160,12 +165,12 @@ func (t *transporter) serveRaft(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
func (t *transporter) serveCfg(w http.ResponseWriter, r *http.Request) {
|
func (t *transporter) serveCfg(w http.ResponseWriter, r *http.Request) {
|
||||||
t.mu.RLock()
|
t.mu.RLock()
|
||||||
if t.stopped {
|
status := t.status
|
||||||
t.mu.RUnlock()
|
t.mu.RUnlock()
|
||||||
|
if status == stopped {
|
||||||
http.Error(w, "404 page not found", http.StatusNotFound)
|
http.Error(w, "404 page not found", http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t.mu.RUnlock()
|
|
||||||
|
|
||||||
id, err := strconv.ParseInt(r.URL.Path[len("/raft/cfg/"):], 10, 64)
|
id, err := strconv.ParseInt(r.URL.Path[len("/raft/cfg/"):], 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user