mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: protect the sender map in SendHub
This commit is contained in:
parent
5f16fab541
commit
ceb077424d
@ -21,6 +21,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver/stats"
|
"github.com/coreos/etcd/etcdserver/stats"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
@ -48,6 +49,7 @@ type sendHub struct {
|
|||||||
p rafthttp.Processor
|
p rafthttp.Processor
|
||||||
ss *stats.ServerStats
|
ss *stats.ServerStats
|
||||||
ls *stats.LeaderStats
|
ls *stats.LeaderStats
|
||||||
|
mu sync.RWMutex // protect the sender map
|
||||||
senders map[types.ID]rafthttp.Sender
|
senders map[types.ID]rafthttp.Sender
|
||||||
shouldstop chan struct{}
|
shouldstop chan struct{}
|
||||||
}
|
}
|
||||||
@ -67,7 +69,11 @@ func newSendHub(t http.RoundTripper, cl ClusterInfo, p rafthttp.Processor, ss *s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *sendHub) Sender(id types.ID) rafthttp.Sender { return h.senders[id] }
|
func (h *sendHub) Sender(id types.ID) rafthttp.Sender {
|
||||||
|
h.mu.RLock()
|
||||||
|
defer h.mu.RUnlock()
|
||||||
|
return h.senders[id]
|
||||||
|
}
|
||||||
|
|
||||||
func (h *sendHub) Send(msgs []raftpb.Message) {
|
func (h *sendHub) Send(msgs []raftpb.Message) {
|
||||||
for _, m := range msgs {
|
for _, m := range msgs {
|
||||||
@ -102,6 +108,8 @@ func (h *sendHub) ShouldStopNotify() <-chan struct{} {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *sendHub) Add(m *Member) {
|
func (h *sendHub) Add(m *Member) {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
if _, ok := h.senders[m.ID]; ok {
|
if _, ok := h.senders[m.ID]; ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -118,11 +126,15 @@ func (h *sendHub) Add(m *Member) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *sendHub) Remove(id types.ID) {
|
func (h *sendHub) Remove(id types.ID) {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
h.senders[id].Stop()
|
h.senders[id].Stop()
|
||||||
delete(h.senders, id)
|
delete(h.senders, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *sendHub) Update(m *Member) {
|
func (h *sendHub) Update(m *Member) {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
// TODO: return error or just panic?
|
// TODO: return error or just panic?
|
||||||
if _, ok := h.senders[m.ID]; !ok {
|
if _, ok := h.senders[m.ID]; !ok {
|
||||||
return
|
return
|
||||||
|
Loading…
x
Reference in New Issue
Block a user