mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #2701 from yichengq/rafthttp-anon
rafthttp: add remotes
This commit is contained in:
commit
ebecee34e0
@ -191,7 +191,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
|||||||
Handler: etcdhttp.NewClientHandler(s),
|
Handler: etcdhttp.NewClientHandler(s),
|
||||||
Info: cfg.corsInfo,
|
Info: cfg.corsInfo,
|
||||||
}
|
}
|
||||||
ph := etcdhttp.NewPeerHandler(s.Cluster, etcdserver.RaftTimer(s), s.RaftHandler())
|
ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler())
|
||||||
// Start the peer server in a goroutine
|
// Start the peer server in a goroutine
|
||||||
for _, l := range plns {
|
for _, l := range plns {
|
||||||
go func(l net.Listener) {
|
go func(l net.Listener) {
|
||||||
|
@ -30,7 +30,6 @@ import (
|
|||||||
"github.com/coreos/etcd/pkg/netutil"
|
"github.com/coreos/etcd/pkg/netutil"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/rafthttp"
|
|
||||||
"github.com/coreos/etcd/store"
|
"github.com/coreos/etcd/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -60,18 +59,7 @@ type Cluster struct {
|
|||||||
id types.ID
|
id types.ID
|
||||||
token string
|
token string
|
||||||
store store.Store
|
store store.Store
|
||||||
// index is the raft index that cluster is updated at bootstrap
|
|
||||||
// from remote cluster info.
|
|
||||||
// It may have a higher value than local raft index, because it
|
|
||||||
// displays a further view of the cluster.
|
|
||||||
// TODO: upgrade it as last modified index
|
|
||||||
index uint64
|
|
||||||
|
|
||||||
// transport and members maintains the view of the cluster at index.
|
|
||||||
// This might be more up to date than what stores in the store since
|
|
||||||
// the index may be higher than store index, which may happen when the
|
|
||||||
// cluster is updated from remote cluster info.
|
|
||||||
transport rafthttp.Transporter
|
|
||||||
sync.Mutex // guards members and removed map
|
sync.Mutex // guards members and removed map
|
||||||
members map[types.ID]*Member
|
members map[types.ID]*Member
|
||||||
// removed contains the ids of removed members in the cluster.
|
// removed contains the ids of removed members in the cluster.
|
||||||
@ -242,23 +230,8 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
|
|||||||
|
|
||||||
func (c *Cluster) SetStore(st store.Store) { c.store = st }
|
func (c *Cluster) SetStore(st store.Store) { c.store = st }
|
||||||
|
|
||||||
func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
|
|
||||||
|
|
||||||
func (c *Cluster) Recover() {
|
func (c *Cluster) Recover() {
|
||||||
c.members, c.removed = membersFromStore(c.store)
|
c.members, c.removed = membersFromStore(c.store)
|
||||||
// recover transport
|
|
||||||
c.transport.RemoveAllPeers()
|
|
||||||
for _, m := range c.Members() {
|
|
||||||
c.transport.AddPeer(m.ID, m.PeerURLs)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cluster) SetTransport(tr rafthttp.Transporter) {
|
|
||||||
c.transport = tr
|
|
||||||
// add all the remote members into transport
|
|
||||||
for _, m := range c.Members() {
|
|
||||||
c.transport.AddPeer(m.ID, m.PeerURLs)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ValidateConfigurationChange takes a proposed ConfChange and
|
// ValidateConfigurationChange takes a proposed ConfChange and
|
||||||
@ -324,8 +297,7 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
|
|||||||
// AddMember adds a new Member into the cluster, and saves the given member's
|
// AddMember adds a new Member into the cluster, and saves the given member's
|
||||||
// raftAttributes into the store. The given member should have empty attributes.
|
// raftAttributes into the store. The given member should have empty attributes.
|
||||||
// A Member with a matching id must not exist.
|
// A Member with a matching id must not exist.
|
||||||
// The given index indicates when the event happens.
|
func (c *Cluster) AddMember(m *Member) {
|
||||||
func (c *Cluster) AddMember(m *Member, index uint64) {
|
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
b, err := json.Marshal(m.RaftAttributes)
|
b, err := json.Marshal(m.RaftAttributes)
|
||||||
@ -336,37 +308,22 @@ func (c *Cluster) AddMember(m *Member, index uint64) {
|
|||||||
if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil {
|
if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil {
|
||||||
log.Panicf("create raftAttributes should never fail: %v", err)
|
log.Panicf("create raftAttributes should never fail: %v", err)
|
||||||
}
|
}
|
||||||
if index > c.index {
|
|
||||||
// TODO: check member does not exist in the cluster
|
|
||||||
// New bootstrapped member has initial cluster, which contains unadded
|
|
||||||
// peers.
|
|
||||||
c.members[m.ID] = m
|
c.members[m.ID] = m
|
||||||
c.transport.AddPeer(m.ID, m.PeerURLs)
|
|
||||||
c.index = index
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveMember removes a member from the store.
|
// RemoveMember removes a member from the store.
|
||||||
// The given id MUST exist, or the function panics.
|
// The given id MUST exist, or the function panics.
|
||||||
// The given index indicates when the event happens.
|
func (c *Cluster) RemoveMember(id types.ID) {
|
||||||
func (c *Cluster) RemoveMember(id types.ID, index uint64) {
|
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
|
if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
|
||||||
log.Panicf("delete member should never fail: %v", err)
|
log.Panicf("delete member should never fail: %v", err)
|
||||||
}
|
}
|
||||||
|
delete(c.members, id)
|
||||||
if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
|
if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
|
||||||
log.Panicf("create removedMember should never fail: %v", err)
|
log.Panicf("create removedMember should never fail: %v", err)
|
||||||
}
|
}
|
||||||
if index > c.index {
|
|
||||||
if _, ok := c.members[id]; !ok {
|
|
||||||
log.Panicf("member %s should exist in the cluster", id)
|
|
||||||
}
|
|
||||||
delete(c.members, id)
|
|
||||||
c.removed[id] = true
|
c.removed[id] = true
|
||||||
c.transport.RemovePeer(id)
|
|
||||||
c.index = index
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) {
|
func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) {
|
||||||
@ -376,9 +333,7 @@ func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) {
|
|||||||
// TODO: update store in this function
|
// TODO: update store in this function
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateRaftAttributes updates the raft attributes of the given id.
|
func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
|
||||||
// The given index indicates when the event happens.
|
|
||||||
func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, index uint64) {
|
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
b, err := json.Marshal(raftAttr)
|
b, err := json.Marshal(raftAttr)
|
||||||
@ -389,11 +344,7 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, ind
|
|||||||
if _, err := c.store.Update(p, string(b), store.Permanent); err != nil {
|
if _, err := c.store.Update(p, string(b), store.Permanent); err != nil {
|
||||||
log.Panicf("update raftAttributes should never fail: %v", err)
|
log.Panicf("update raftAttributes should never fail: %v", err)
|
||||||
}
|
}
|
||||||
if index > c.index {
|
|
||||||
c.members[id].RaftAttributes = raftAttr
|
c.members[id].RaftAttributes = raftAttr
|
||||||
c.transport.UpdatePeer(id, raftAttr.PeerURLs)
|
|
||||||
c.index = index
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate ensures that there is no identical urls in the cluster peer list
|
// Validate ensures that there is no identical urls in the cluster peer list
|
||||||
|
@ -96,9 +96,8 @@ func TestClusterFromStore(t *testing.T) {
|
|||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
hc := newTestCluster(nil)
|
hc := newTestCluster(nil)
|
||||||
hc.SetStore(store.New())
|
hc.SetStore(store.New())
|
||||||
hc.SetTransport(&nopTransporter{})
|
for _, m := range tt.mems {
|
||||||
for j, m := range tt.mems {
|
hc.AddMember(m)
|
||||||
hc.AddMember(m, uint64(j))
|
|
||||||
}
|
}
|
||||||
c := NewClusterFromStore("abc", hc.store)
|
c := NewClusterFromStore("abc", hc.store)
|
||||||
if c.token != "abc" {
|
if c.token != "abc" {
|
||||||
@ -358,12 +357,11 @@ func TestClusterValidateAndAssignIDs(t *testing.T) {
|
|||||||
func TestClusterValidateConfigurationChange(t *testing.T) {
|
func TestClusterValidateConfigurationChange(t *testing.T) {
|
||||||
cl := newCluster("")
|
cl := newCluster("")
|
||||||
cl.SetStore(store.New())
|
cl.SetStore(store.New())
|
||||||
cl.SetTransport(&nopTransporter{})
|
|
||||||
for i := 1; i <= 4; i++ {
|
for i := 1; i <= 4; i++ {
|
||||||
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}}
|
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}}
|
||||||
cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr}, uint64(i))
|
cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr})
|
||||||
}
|
}
|
||||||
cl.RemoveMember(4, 5)
|
cl.RemoveMember(4)
|
||||||
|
|
||||||
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}}
|
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}}
|
||||||
ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr})
|
ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr})
|
||||||
@ -491,8 +489,7 @@ func TestClusterGenID(t *testing.T) {
|
|||||||
previd := cs.ID()
|
previd := cs.ID()
|
||||||
|
|
||||||
cs.SetStore(&storeRecorder{})
|
cs.SetStore(&storeRecorder{})
|
||||||
cs.SetTransport(&nopTransporter{})
|
cs.AddMember(newTestMember(3, nil, "", nil))
|
||||||
cs.AddMember(newTestMember(3, nil, "", nil), 1)
|
|
||||||
cs.genID()
|
cs.genID()
|
||||||
if cs.ID() == previd {
|
if cs.ID() == previd {
|
||||||
t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd)
|
t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd)
|
||||||
@ -535,8 +532,7 @@ func TestClusterAddMember(t *testing.T) {
|
|||||||
st := &storeRecorder{}
|
st := &storeRecorder{}
|
||||||
c := newTestCluster(nil)
|
c := newTestCluster(nil)
|
||||||
c.SetStore(st)
|
c.SetStore(st)
|
||||||
c.SetTransport(&nopTransporter{})
|
c.AddMember(newTestMember(1, nil, "node1", nil))
|
||||||
c.AddMember(newTestMember(1, nil, "node1", nil), 1)
|
|
||||||
|
|
||||||
wactions := []testutil.Action{
|
wactions := []testutil.Action{
|
||||||
{
|
{
|
||||||
@ -621,14 +617,10 @@ func TestClusterString(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestClusterRemoveMember(t *testing.T) {
|
func TestClusterRemoveMember(t *testing.T) {
|
||||||
c := newTestCluster(nil)
|
|
||||||
c.SetStore(&storeRecorder{})
|
|
||||||
c.SetTransport(&nopTransporter{})
|
|
||||||
c.AddMember(newTestMember(1, nil, "", nil), 1)
|
|
||||||
|
|
||||||
st := &storeRecorder{}
|
st := &storeRecorder{}
|
||||||
|
c := newTestCluster(nil)
|
||||||
c.SetStore(st)
|
c.SetStore(st)
|
||||||
c.RemoveMember(1, 2)
|
c.RemoveMember(1)
|
||||||
|
|
||||||
wactions := []testutil.Action{
|
wactions := []testutil.Action{
|
||||||
{Name: "Delete", Params: []interface{}{memberStoreKey(1), true, true}},
|
{Name: "Delete", Params: []interface{}{memberStoreKey(1), true, true}},
|
||||||
|
@ -21,7 +21,6 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
@ -89,21 +88,7 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var index uint64
|
return NewClusterFromMembers("", id, membs), nil
|
||||||
// The header at or before v2.0.3 doesn't have this field. For backward
|
|
||||||
// compatibility, it checks whether the field exists.
|
|
||||||
if indexStr := resp.Header.Get("X-Raft-Index"); indexStr != "" {
|
|
||||||
index, err = strconv.ParseUint(indexStr, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
if logerr {
|
|
||||||
log.Printf("etcdserver: could not parse raft index: %v", err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cl := NewClusterFromMembers("", id, membs)
|
|
||||||
cl.UpdateIndex(index)
|
|
||||||
return cl, nil
|
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
|
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,6 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
"github.com/coreos/etcd/rafthttp"
|
"github.com/coreos/etcd/rafthttp"
|
||||||
@ -29,10 +28,9 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
|
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
|
||||||
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTimer, raftHandler http.Handler) http.Handler {
|
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler {
|
||||||
mh := &peerMembersHandler{
|
mh := &peerMembersHandler{
|
||||||
clusterInfo: clusterInfo,
|
clusterInfo: clusterInfo,
|
||||||
timer: timer,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
@ -46,7 +44,6 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTim
|
|||||||
|
|
||||||
type peerMembersHandler struct {
|
type peerMembersHandler struct {
|
||||||
clusterInfo etcdserver.ClusterInfo
|
clusterInfo etcdserver.ClusterInfo
|
||||||
timer etcdserver.RaftTimer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -54,7 +51,6 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
|
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
|
||||||
w.Header().Set("X-Raft-Index", strconv.FormatUint(h.timer.Index(), 10))
|
|
||||||
|
|
||||||
if r.URL.Path != peerMembersPrefix {
|
if r.URL.Path != peerMembersPrefix {
|
||||||
http.Error(w, "bad path", http.StatusBadRequest)
|
http.Error(w, "bad path", http.StatusBadRequest)
|
||||||
|
@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
|
|||||||
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Write([]byte("test data"))
|
w.Write([]byte("test data"))
|
||||||
})
|
})
|
||||||
ph := NewPeerHandler(&fakeCluster{}, &dummyRaftTimer{}, h)
|
ph := NewPeerHandler(&fakeCluster{}, h)
|
||||||
srv := httptest.NewServer(ph)
|
srv := httptest.NewServer(ph)
|
||||||
defer srv.Close()
|
defer srv.Close()
|
||||||
|
|
||||||
@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) {
|
|||||||
id: 1,
|
id: 1,
|
||||||
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
|
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
|
||||||
}
|
}
|
||||||
h := &peerMembersHandler{clusterInfo: cluster, timer: &dummyRaftTimer{}}
|
h := &peerMembersHandler{clusterInfo: cluster}
|
||||||
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
|
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -168,6 +168,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
haveWAL := wal.Exist(cfg.WALDir())
|
haveWAL := wal.Exist(cfg.WALDir())
|
||||||
ss := snap.New(cfg.SnapDir())
|
ss := snap.New(cfg.SnapDir())
|
||||||
|
|
||||||
|
var remotes []*Member
|
||||||
switch {
|
switch {
|
||||||
case !haveWAL && !cfg.NewCluster:
|
case !haveWAL && !cfg.NewCluster:
|
||||||
if err := cfg.VerifyJoinExisting(); err != nil {
|
if err := cfg.VerifyJoinExisting(); err != nil {
|
||||||
@ -180,7 +181,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
|
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
|
||||||
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
|
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
|
||||||
}
|
}
|
||||||
cfg.Cluster.UpdateIndex(existingCluster.index)
|
remotes = existingCluster.Members()
|
||||||
cfg.Cluster.SetID(existingCluster.id)
|
cfg.Cluster.SetID(existingCluster.id)
|
||||||
cfg.Cluster.SetStore(st)
|
cfg.Cluster.SetStore(st)
|
||||||
cfg.Print()
|
cfg.Print()
|
||||||
@ -271,9 +272,20 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
|
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: move transport initialization near the definition of remote
|
||||||
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
|
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
|
||||||
|
// add all remotes into transport
|
||||||
|
for _, m := range remotes {
|
||||||
|
if m.ID != id {
|
||||||
|
tr.AddRemote(m.ID, m.PeerURLs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, m := range cfg.Cluster.Members() {
|
||||||
|
if m.ID != id {
|
||||||
|
tr.AddPeer(m.ID, m.PeerURLs)
|
||||||
|
}
|
||||||
|
}
|
||||||
srv.r.transport = tr
|
srv.r.transport = tr
|
||||||
srv.Cluster.SetTransport(tr)
|
|
||||||
return srv, nil
|
return srv, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -374,11 +386,15 @@ func (s *EtcdServer) run() {
|
|||||||
if err := s.store.Recovery(apply.snapshot.Data); err != nil {
|
if err := s.store.Recovery(apply.snapshot.Data); err != nil {
|
||||||
log.Panicf("recovery store error: %v", err)
|
log.Panicf("recovery store error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Avoid snapshot recovery overwriting newer cluster and
|
|
||||||
// transport setting, which may block the communication.
|
|
||||||
if s.Cluster.index < apply.snapshot.Metadata.Index {
|
|
||||||
s.Cluster.Recover()
|
s.Cluster.Recover()
|
||||||
|
|
||||||
|
// recover raft transport
|
||||||
|
s.r.transport.RemoveAllPeers()
|
||||||
|
for _, m := range s.Cluster.Members() {
|
||||||
|
if m.ID == s.ID() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
s.r.transport.AddPeer(m.ID, m.PeerURLs)
|
||||||
}
|
}
|
||||||
|
|
||||||
appliedi = apply.snapshot.Metadata.Index
|
appliedi = apply.snapshot.Metadata.Index
|
||||||
@ -680,7 +696,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
|
|||||||
case raftpb.EntryConfChange:
|
case raftpb.EntryConfChange:
|
||||||
var cc raftpb.ConfChange
|
var cc raftpb.ConfChange
|
||||||
pbutil.MustUnmarshal(&cc, e.Data)
|
pbutil.MustUnmarshal(&cc, e.Data)
|
||||||
shouldstop, err = s.applyConfChange(cc, confState, e.Index)
|
shouldstop, err = s.applyConfChange(cc, confState)
|
||||||
s.w.Trigger(cc.ID, err)
|
s.w.Trigger(cc.ID, err)
|
||||||
default:
|
default:
|
||||||
log.Panicf("entry type should be either EntryNormal or EntryConfChange")
|
log.Panicf("entry type should be either EntryNormal or EntryConfChange")
|
||||||
@ -745,9 +761,9 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// applyConfChange applies a ConfChange to the server at the given index. It is only
|
// applyConfChange applies a ConfChange to the server. It is only
|
||||||
// invoked with a ConfChange that has already passed through Raft.
|
// invoked with a ConfChange that has already passed through Raft
|
||||||
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, index uint64) (bool, error) {
|
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
|
||||||
if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
|
if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
|
||||||
cc.NodeID = raft.None
|
cc.NodeID = raft.None
|
||||||
s.r.ApplyConfChange(cc)
|
s.r.ApplyConfChange(cc)
|
||||||
@ -763,18 +779,20 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
|||||||
if cc.NodeID != uint64(m.ID) {
|
if cc.NodeID != uint64(m.ID) {
|
||||||
log.Panicf("nodeID should always be equal to member ID")
|
log.Panicf("nodeID should always be equal to member ID")
|
||||||
}
|
}
|
||||||
s.Cluster.AddMember(m, index)
|
s.Cluster.AddMember(m)
|
||||||
if m.ID == s.id {
|
if m.ID == s.id {
|
||||||
log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
||||||
} else {
|
} else {
|
||||||
|
s.r.transport.AddPeer(m.ID, m.PeerURLs)
|
||||||
log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
||||||
}
|
}
|
||||||
case raftpb.ConfChangeRemoveNode:
|
case raftpb.ConfChangeRemoveNode:
|
||||||
id := types.ID(cc.NodeID)
|
id := types.ID(cc.NodeID)
|
||||||
s.Cluster.RemoveMember(id, index)
|
s.Cluster.RemoveMember(id)
|
||||||
if id == s.id {
|
if id == s.id {
|
||||||
return true, nil
|
return true, nil
|
||||||
} else {
|
} else {
|
||||||
|
s.r.transport.RemovePeer(id)
|
||||||
log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
|
log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
|
||||||
}
|
}
|
||||||
case raftpb.ConfChangeUpdateNode:
|
case raftpb.ConfChangeUpdateNode:
|
||||||
@ -785,10 +803,11 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
|||||||
if cc.NodeID != uint64(m.ID) {
|
if cc.NodeID != uint64(m.ID) {
|
||||||
log.Panicf("nodeID should always be equal to member ID")
|
log.Panicf("nodeID should always be equal to member ID")
|
||||||
}
|
}
|
||||||
s.Cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, index)
|
s.Cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
|
||||||
if m.ID == s.id {
|
if m.ID == s.id {
|
||||||
log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
||||||
} else {
|
} else {
|
||||||
|
s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
|
||||||
log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -411,11 +411,10 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
|
|||||||
func TestApplyConfChangeError(t *testing.T) {
|
func TestApplyConfChangeError(t *testing.T) {
|
||||||
cl := newCluster("")
|
cl := newCluster("")
|
||||||
cl.SetStore(store.New())
|
cl.SetStore(store.New())
|
||||||
cl.SetTransport(&nopTransporter{})
|
|
||||||
for i := 1; i <= 4; i++ {
|
for i := 1; i <= 4; i++ {
|
||||||
cl.AddMember(&Member{ID: types.ID(i)}, uint64(i))
|
cl.AddMember(&Member{ID: types.ID(i)})
|
||||||
}
|
}
|
||||||
cl.RemoveMember(4, 5)
|
cl.RemoveMember(4)
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
cc raftpb.ConfChange
|
cc raftpb.ConfChange
|
||||||
@ -456,7 +455,7 @@ func TestApplyConfChangeError(t *testing.T) {
|
|||||||
r: raftNode{Node: n},
|
r: raftNode{Node: n},
|
||||||
Cluster: cl,
|
Cluster: cl,
|
||||||
}
|
}
|
||||||
_, err := srv.applyConfChange(tt.cc, nil, 10)
|
_, err := srv.applyConfChange(tt.cc, nil)
|
||||||
if err != tt.werr {
|
if err != tt.werr {
|
||||||
t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
|
t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
|
||||||
}
|
}
|
||||||
@ -476,9 +475,8 @@ func TestApplyConfChangeError(t *testing.T) {
|
|||||||
func TestApplyConfChangeShouldStop(t *testing.T) {
|
func TestApplyConfChangeShouldStop(t *testing.T) {
|
||||||
cl := newCluster("")
|
cl := newCluster("")
|
||||||
cl.SetStore(store.New())
|
cl.SetStore(store.New())
|
||||||
cl.SetTransport(&nopTransporter{})
|
|
||||||
for i := 1; i <= 3; i++ {
|
for i := 1; i <= 3; i++ {
|
||||||
cl.AddMember(&Member{ID: types.ID(i)}, uint64(i))
|
cl.AddMember(&Member{ID: types.ID(i)})
|
||||||
}
|
}
|
||||||
srv := &EtcdServer{
|
srv := &EtcdServer{
|
||||||
id: 1,
|
id: 1,
|
||||||
@ -493,7 +491,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
|
|||||||
NodeID: 2,
|
NodeID: 2,
|
||||||
}
|
}
|
||||||
// remove non-local member
|
// remove non-local member
|
||||||
shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{}, 10)
|
shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error %v", err)
|
t.Fatalf("unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@ -503,7 +501,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
|
|||||||
|
|
||||||
// remove local member
|
// remove local member
|
||||||
cc.NodeID = 1
|
cc.NodeID = 1
|
||||||
shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{}, 10)
|
shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error %v", err)
|
t.Fatalf("unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@ -774,7 +772,6 @@ func TestRecvSnapshot(t *testing.T) {
|
|||||||
p := &storageRecorder{}
|
p := &storageRecorder{}
|
||||||
cl := newCluster("abc")
|
cl := newCluster("abc")
|
||||||
cl.SetStore(store.New())
|
cl.SetStore(store.New())
|
||||||
cl.SetTransport(&nopTransporter{})
|
|
||||||
s := &EtcdServer{
|
s := &EtcdServer{
|
||||||
r: raftNode{
|
r: raftNode{
|
||||||
Node: n,
|
Node: n,
|
||||||
@ -809,7 +806,6 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
|
|||||||
st := &storeRecorder{}
|
st := &storeRecorder{}
|
||||||
cl := newCluster("abc")
|
cl := newCluster("abc")
|
||||||
cl.SetStore(store.New())
|
cl.SetStore(store.New())
|
||||||
cl.SetTransport(&nopTransporter{})
|
|
||||||
storage := raft.NewMemoryStorage()
|
storage := raft.NewMemoryStorage()
|
||||||
s := &EtcdServer{
|
s := &EtcdServer{
|
||||||
r: raftNode{
|
r: raftNode{
|
||||||
@ -855,7 +851,6 @@ func TestAddMember(t *testing.T) {
|
|||||||
cl := newTestCluster(nil)
|
cl := newTestCluster(nil)
|
||||||
st := store.New()
|
st := store.New()
|
||||||
cl.SetStore(st)
|
cl.SetStore(st)
|
||||||
cl.SetTransport(&nopTransporter{})
|
|
||||||
s := &EtcdServer{
|
s := &EtcdServer{
|
||||||
r: raftNode{
|
r: raftNode{
|
||||||
Node: n,
|
Node: n,
|
||||||
@ -894,7 +889,7 @@ func TestRemoveMember(t *testing.T) {
|
|||||||
cl := newTestCluster(nil)
|
cl := newTestCluster(nil)
|
||||||
st := store.New()
|
st := store.New()
|
||||||
cl.SetStore(store.New())
|
cl.SetStore(store.New())
|
||||||
cl.SetTransport(&nopTransporter{})
|
cl.AddMember(&Member{ID: 1234})
|
||||||
s := &EtcdServer{
|
s := &EtcdServer{
|
||||||
r: raftNode{
|
r: raftNode{
|
||||||
Node: n,
|
Node: n,
|
||||||
@ -907,7 +902,6 @@ func TestRemoveMember(t *testing.T) {
|
|||||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
s.start()
|
s.start()
|
||||||
s.AddMember(context.TODO(), Member{ID: 1234})
|
|
||||||
err := s.RemoveMember(context.TODO(), 1234)
|
err := s.RemoveMember(context.TODO(), 1234)
|
||||||
gaction := n.Action()
|
gaction := n.Action()
|
||||||
s.Stop()
|
s.Stop()
|
||||||
@ -915,12 +909,7 @@ func TestRemoveMember(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("RemoveMember error: %v", err)
|
t.Fatalf("RemoveMember error: %v", err)
|
||||||
}
|
}
|
||||||
wactions := []testutil.Action{
|
wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeRemoveNode"}, {Name: "ApplyConfChange:ConfChangeRemoveNode"}}
|
||||||
{Name: "ProposeConfChange:ConfChangeAddNode"},
|
|
||||||
{Name: "ApplyConfChange:ConfChangeAddNode"},
|
|
||||||
{Name: "ProposeConfChange:ConfChangeRemoveNode"},
|
|
||||||
{Name: "ApplyConfChange:ConfChangeRemoveNode"},
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(gaction, wactions) {
|
if !reflect.DeepEqual(gaction, wactions) {
|
||||||
t.Errorf("action = %v, want %v", gaction, wactions)
|
t.Errorf("action = %v, want %v", gaction, wactions)
|
||||||
}
|
}
|
||||||
@ -938,7 +927,7 @@ func TestUpdateMember(t *testing.T) {
|
|||||||
cl := newTestCluster(nil)
|
cl := newTestCluster(nil)
|
||||||
st := store.New()
|
st := store.New()
|
||||||
cl.SetStore(st)
|
cl.SetStore(st)
|
||||||
cl.SetTransport(&nopTransporter{})
|
cl.AddMember(&Member{ID: 1234})
|
||||||
s := &EtcdServer{
|
s := &EtcdServer{
|
||||||
r: raftNode{
|
r: raftNode{
|
||||||
Node: n,
|
Node: n,
|
||||||
@ -951,7 +940,6 @@ func TestUpdateMember(t *testing.T) {
|
|||||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
s.start()
|
s.start()
|
||||||
s.AddMember(context.TODO(), Member{ID: 1234})
|
|
||||||
wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
|
wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
|
||||||
err := s.UpdateMember(context.TODO(), wm)
|
err := s.UpdateMember(context.TODO(), wm)
|
||||||
gaction := n.Action()
|
gaction := n.Action()
|
||||||
@ -960,12 +948,7 @@ func TestUpdateMember(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("UpdateMember error: %v", err)
|
t.Fatalf("UpdateMember error: %v", err)
|
||||||
}
|
}
|
||||||
wactions := []testutil.Action{
|
wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeUpdateNode"}, {Name: "ApplyConfChange:ConfChangeUpdateNode"}}
|
||||||
{Name: "ProposeConfChange:ConfChangeAddNode"},
|
|
||||||
{Name: "ApplyConfChange:ConfChangeAddNode"},
|
|
||||||
{Name: "ProposeConfChange:ConfChangeUpdateNode"},
|
|
||||||
{Name: "ApplyConfChange:ConfChangeUpdateNode"},
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(gaction, wactions) {
|
if !reflect.DeepEqual(gaction, wactions) {
|
||||||
t.Errorf("action = %v, want %v", gaction, wactions)
|
t.Errorf("action = %v, want %v", gaction, wactions)
|
||||||
}
|
}
|
||||||
@ -1383,6 +1366,7 @@ type nopTransporter struct{}
|
|||||||
|
|
||||||
func (s *nopTransporter) Handler() http.Handler { return nil }
|
func (s *nopTransporter) Handler() http.Handler { return nil }
|
||||||
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
||||||
|
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
|
||||||
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
||||||
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||||
func (s *nopTransporter) RemoveAllPeers() {}
|
func (s *nopTransporter) RemoveAllPeers() {}
|
||||||
|
@ -625,7 +625,7 @@ func (m *member) Launch() error {
|
|||||||
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
|
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
|
||||||
m.s.Start()
|
m.s.Start()
|
||||||
|
|
||||||
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s, m.s.RaftHandler())}
|
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())}
|
||||||
|
|
||||||
for _, ln := range m.PeerListeners {
|
for _, ln := range m.PeerListeners {
|
||||||
hs := &httptest.Server{
|
hs := &httptest.Server{
|
||||||
|
@ -101,7 +101,7 @@ func (p *pipeline) handle() {
|
|||||||
log.Printf("pipeline: the connection with %s became inactive", p.id)
|
log.Printf("pipeline: the connection with %s became inactive", p.id)
|
||||||
p.active = false
|
p.active = false
|
||||||
}
|
}
|
||||||
if m.Type == raftpb.MsgApp {
|
if m.Type == raftpb.MsgApp && p.fs != nil {
|
||||||
p.fs.Fail()
|
p.fs.Fail()
|
||||||
}
|
}
|
||||||
p.r.ReportUnreachable(m.To)
|
p.r.ReportUnreachable(m.To)
|
||||||
@ -114,7 +114,7 @@ func (p *pipeline) handle() {
|
|||||||
p.active = true
|
p.active = true
|
||||||
p.errored = nil
|
p.errored = nil
|
||||||
}
|
}
|
||||||
if m.Type == raftpb.MsgApp {
|
if m.Type == raftpb.MsgApp && p.fs != nil {
|
||||||
p.fs.Succ(end.Sub(start))
|
p.fs.Succ(end.Sub(start))
|
||||||
}
|
}
|
||||||
if isMsgSnap(m) {
|
if isMsgSnap(m) {
|
||||||
|
48
rafthttp/remote.go
Normal file
48
rafthttp/remote.go
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
// Copyright 2015 CoreOS, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package rafthttp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/pkg/types"
|
||||||
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type remote struct {
|
||||||
|
id types.ID
|
||||||
|
pipeline *pipeline
|
||||||
|
}
|
||||||
|
|
||||||
|
func startRemote(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, errorc chan error) *remote {
|
||||||
|
picker := newURLPicker(urls)
|
||||||
|
return &remote{
|
||||||
|
id: to,
|
||||||
|
pipeline: newPipeline(tr, picker, to, cid, nil, r, errorc),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *remote) Send(m raftpb.Message) {
|
||||||
|
select {
|
||||||
|
case g.pipeline.msgc <- m:
|
||||||
|
default:
|
||||||
|
log.Printf("remote: dropping %s to %s since pipeline with %d-size buffer is blocked", m.Type, g.id, pipelineBufSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *remote) Stop() {
|
||||||
|
g.pipeline.stop()
|
||||||
|
}
|
@ -45,6 +45,12 @@ type Transporter interface {
|
|||||||
// If the id cannot be found in the transport, the message
|
// If the id cannot be found in the transport, the message
|
||||||
// will be ignored.
|
// will be ignored.
|
||||||
Send(m []raftpb.Message)
|
Send(m []raftpb.Message)
|
||||||
|
// AddRemote adds a remote with given peer urls into the transport.
|
||||||
|
// A remote helps newly joined member to catch up the progress of cluster,
|
||||||
|
// and will not be used after that.
|
||||||
|
// It is the caller's responsibility to ensure the urls are all vaild,
|
||||||
|
// or it panics.
|
||||||
|
AddRemote(id types.ID, urls []string)
|
||||||
// AddPeer adds a peer with given peer urls into the transport.
|
// AddPeer adds a peer with given peer urls into the transport.
|
||||||
// It is the caller's responsibility to ensure the urls are all vaild,
|
// It is the caller's responsibility to ensure the urls are all vaild,
|
||||||
// or it panics.
|
// or it panics.
|
||||||
@ -70,8 +76,9 @@ type transport struct {
|
|||||||
serverStats *stats.ServerStats
|
serverStats *stats.ServerStats
|
||||||
leaderStats *stats.LeaderStats
|
leaderStats *stats.LeaderStats
|
||||||
|
|
||||||
mu sync.RWMutex // protect the peer map
|
mu sync.RWMutex // protect the remote and peer map
|
||||||
peers map[types.ID]Peer // remote peers
|
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
|
||||||
|
peers map[types.ID]Peer // peers map
|
||||||
errorc chan error
|
errorc chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -83,6 +90,7 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan
|
|||||||
raft: r,
|
raft: r,
|
||||||
serverStats: ss,
|
serverStats: ss,
|
||||||
leaderStats: ls,
|
leaderStats: ls,
|
||||||
|
remotes: make(map[types.ID]*remote),
|
||||||
peers: make(map[types.ID]Peer),
|
peers: make(map[types.ID]Peer),
|
||||||
errorc: errorc,
|
errorc: errorc,
|
||||||
}
|
}
|
||||||
@ -110,21 +118,30 @@ func (t *transport) Send(msgs []raftpb.Message) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
to := types.ID(m.To)
|
to := types.ID(m.To)
|
||||||
p, ok := t.peers[to]
|
|
||||||
if !ok {
|
|
||||||
log.Printf("etcdserver: send message to unknown receiver %s", to)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
|
p, ok := t.peers[to]
|
||||||
|
if ok {
|
||||||
if m.Type == raftpb.MsgApp {
|
if m.Type == raftpb.MsgApp {
|
||||||
t.serverStats.SendAppendReq(m.Size())
|
t.serverStats.SendAppendReq(m.Size())
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Send(m)
|
p.Send(m)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
g, ok := t.remotes[to]
|
||||||
|
if ok {
|
||||||
|
g.Send(m)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("etcdserver: send message to unknown receiver %s", to)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *transport) Stop() {
|
func (t *transport) Stop() {
|
||||||
|
for _, r := range t.remotes {
|
||||||
|
r.Stop()
|
||||||
|
}
|
||||||
for _, p := range t.peers {
|
for _, p := range t.peers {
|
||||||
p.Stop()
|
p.Stop()
|
||||||
}
|
}
|
||||||
@ -133,14 +150,22 @@ func (t *transport) Stop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *transport) AddRemote(id types.ID, us []string) {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
if _, ok := t.remotes[id]; ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
urls, err := types.NewURLs(us)
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("newURLs %+v should never fail: %+v", us, err)
|
||||||
|
}
|
||||||
|
t.remotes[id] = startRemote(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, t.errorc)
|
||||||
|
}
|
||||||
|
|
||||||
func (t *transport) AddPeer(id types.ID, us []string) {
|
func (t *transport) AddPeer(id types.ID, us []string) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
// There is no need to build connection to itself because local message
|
|
||||||
// is not sent through transport.
|
|
||||||
if id == t.id {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if _, ok := t.peers[id]; ok {
|
if _, ok := t.peers[id]; ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -155,9 +180,6 @@ func (t *transport) AddPeer(id types.ID, us []string) {
|
|||||||
func (t *transport) RemovePeer(id types.ID) {
|
func (t *transport) RemovePeer(id types.ID) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
if id == t.id {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
t.removePeer(id)
|
t.removePeer(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -183,9 +205,6 @@ func (t *transport) removePeer(id types.ID) {
|
|||||||
func (t *transport) UpdatePeer(id types.ID, us []string) {
|
func (t *transport) UpdatePeer(id types.ID, us []string) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
if id == t.id {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// TODO: return error or just panic?
|
// TODO: return error or just panic?
|
||||||
if _, ok := t.peers[id]; !ok {
|
if _, ok := t.peers[id]; !ok {
|
||||||
return
|
return
|
||||||
|
Loading…
x
Reference in New Issue
Block a user