mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
@@ -263,12 +263,13 @@ func (c *Cluster) SetStore(st store.Store) { c.store = st }
|
||||
// ensures that it is still valid.
|
||||
func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
|
||||
members, removed := membersFromStore(c.store)
|
||||
if removed[types.ID(cc.NodeID)] {
|
||||
id := types.ID(cc.NodeID)
|
||||
if removed[id] {
|
||||
return ErrIDRemoved
|
||||
}
|
||||
switch cc.Type {
|
||||
case raftpb.ConfChangeAddNode:
|
||||
if members[types.ID(cc.NodeID)] != nil {
|
||||
if members[id] != nil {
|
||||
return ErrIDExists
|
||||
}
|
||||
urls := make(map[string]bool)
|
||||
@@ -287,11 +288,33 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
|
||||
}
|
||||
}
|
||||
case raftpb.ConfChangeRemoveNode:
|
||||
if members[types.ID(cc.NodeID)] == nil {
|
||||
if members[id] == nil {
|
||||
return ErrIDNotFound
|
||||
}
|
||||
case raftpb.ConfChangeUpdateNode:
|
||||
if members[id] == nil {
|
||||
return ErrIDNotFound
|
||||
}
|
||||
urls := make(map[string]bool)
|
||||
for _, m := range members {
|
||||
if m.ID == id {
|
||||
continue
|
||||
}
|
||||
for _, u := range m.PeerURLs {
|
||||
urls[u] = true
|
||||
}
|
||||
}
|
||||
m := new(Member)
|
||||
if err := json.Unmarshal(cc.Context, m); err != nil {
|
||||
log.Panicf("unmarshal member should never fail: %v", err)
|
||||
}
|
||||
for _, u := range m.PeerURLs {
|
||||
if urls[u] {
|
||||
return ErrPeerURLexists
|
||||
}
|
||||
}
|
||||
default:
|
||||
log.Panicf("ConfChange type should be either AddNode or RemoveNode")
|
||||
log.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -341,6 +364,20 @@ func (c *Cluster) UpdateMemberAttributes(id types.ID, attr Attributes) {
|
||||
c.members[id].Attributes = attr
|
||||
}
|
||||
|
||||
func (c *Cluster) UpdateMember(nm *Member) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
b, err := json.Marshal(nm.RaftAttributes)
|
||||
if err != nil {
|
||||
log.Panicf("marshal raftAttributes should never fail: %v", err)
|
||||
}
|
||||
p := path.Join(memberStoreKey(nm.ID), raftAttributesSuffix)
|
||||
if _, err := c.store.Update(p, string(b), store.Permanent); err != nil {
|
||||
log.Panicf("update raftAttributes should never fail: %v", err)
|
||||
}
|
||||
c.members[nm.ID].RaftAttributes = nm.RaftAttributes
|
||||
}
|
||||
|
||||
// nodeToMember builds member through a store node.
|
||||
// the child nodes of the given node should be sorted by key.
|
||||
func nodeToMember(n *store.NodeExtern) (*Member, error) {
|
||||
|
||||
@@ -362,7 +362,25 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
|
||||
cl.RemoveMember(4)
|
||||
|
||||
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}}
|
||||
cxt, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr})
|
||||
ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}}
|
||||
ctx5, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 3)}}
|
||||
ctx2to3, err := json.Marshal(&Member{ID: types.ID(2), RaftAttributes: attr})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}}
|
||||
ctx2to5, err := json.Marshal(&Member{ID: types.ID(2), RaftAttributes: attr})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -403,7 +421,7 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
|
||||
raftpb.ConfChange{
|
||||
Type: raftpb.ConfChangeAddNode,
|
||||
NodeID: 5,
|
||||
Context: cxt,
|
||||
Context: ctx,
|
||||
},
|
||||
ErrPeerURLexists,
|
||||
},
|
||||
@@ -414,6 +432,39 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
|
||||
},
|
||||
ErrIDNotFound,
|
||||
},
|
||||
{
|
||||
raftpb.ConfChange{
|
||||
Type: raftpb.ConfChangeAddNode,
|
||||
NodeID: 5,
|
||||
Context: ctx5,
|
||||
},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
raftpb.ConfChange{
|
||||
Type: raftpb.ConfChangeUpdateNode,
|
||||
NodeID: 5,
|
||||
Context: ctx,
|
||||
},
|
||||
ErrIDNotFound,
|
||||
},
|
||||
// try to change the peer url of 2 to the peer url of 3
|
||||
{
|
||||
raftpb.ConfChange{
|
||||
Type: raftpb.ConfChangeUpdateNode,
|
||||
NodeID: 2,
|
||||
Context: ctx2to3,
|
||||
},
|
||||
ErrPeerURLexists,
|
||||
},
|
||||
{
|
||||
raftpb.ConfChange{
|
||||
Type: raftpb.ConfChangeUpdateNode,
|
||||
NodeID: 2,
|
||||
Context: ctx2to5,
|
||||
},
|
||||
nil,
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
err := cl.ValidateConfigurationChange(tt.cc)
|
||||
|
||||
@@ -148,7 +148,7 @@ type membersHandler struct {
|
||||
}
|
||||
|
||||
func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if !allowMethod(w, r.Method, "GET", "POST", "DELETE") {
|
||||
if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") {
|
||||
return
|
||||
}
|
||||
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
|
||||
@@ -168,25 +168,13 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
log.Printf("etcdhttp: %v", err)
|
||||
}
|
||||
case "POST":
|
||||
ctype := r.Header.Get("Content-Type")
|
||||
if ctype != "application/json" {
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype)))
|
||||
return
|
||||
}
|
||||
b, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
|
||||
return
|
||||
}
|
||||
req := httptypes.MemberCreateRequest{}
|
||||
if err := json.Unmarshal(b, &req); err != nil {
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
|
||||
if ok := unmarshalRequest(r, &req, w); !ok {
|
||||
return
|
||||
}
|
||||
|
||||
now := h.clock.Now()
|
||||
m := etcdserver.NewMember("", req.PeerURLs, "", &now)
|
||||
err = h.server.AddMember(ctx, *m)
|
||||
err := h.server.AddMember(ctx, *m)
|
||||
switch {
|
||||
case err == etcdserver.ErrIDExists || err == etcdserver.ErrPeerURLexists:
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
|
||||
@@ -203,28 +191,47 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
log.Printf("etcdhttp: %v", err)
|
||||
}
|
||||
case "DELETE":
|
||||
idStr := trimPrefix(r.URL.Path, membersPrefix)
|
||||
if idStr == "" {
|
||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||
id, ok := getID(r.URL.Path, w)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
id, err := types.IDFromString(idStr)
|
||||
if err != nil {
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr)))
|
||||
return
|
||||
}
|
||||
err = h.server.RemoveMember(ctx, uint64(id))
|
||||
err := h.server.RemoveMember(ctx, uint64(id))
|
||||
switch {
|
||||
case err == etcdserver.ErrIDRemoved:
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", idStr)))
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id)))
|
||||
case err == etcdserver.ErrIDNotFound:
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr)))
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id)))
|
||||
case err != nil:
|
||||
log.Printf("etcdhttp: error removing node %s: %v", id, err)
|
||||
writeError(w, err)
|
||||
default:
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
case "PUT":
|
||||
id, ok := getID(r.URL.Path, w)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
req := httptypes.MemberUpdateRequest{}
|
||||
if ok := unmarshalRequest(r, &req, w); !ok {
|
||||
return
|
||||
}
|
||||
m := etcdserver.Member{
|
||||
ID: id,
|
||||
RaftAttributes: etcdserver.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()},
|
||||
}
|
||||
err := h.server.UpdateMember(ctx, m)
|
||||
switch {
|
||||
case err == etcdserver.ErrPeerURLexists:
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
|
||||
case err == etcdserver.ErrIDNotFound:
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id)))
|
||||
case err != nil:
|
||||
log.Printf("etcdhttp: error updating node %s: %v", m.ID, err)
|
||||
writeError(w, err)
|
||||
default:
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -506,6 +513,38 @@ func trimErrorPrefix(err error, prefix string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func unmarshalRequest(r *http.Request, req json.Unmarshaler, w http.ResponseWriter) bool {
|
||||
ctype := r.Header.Get("Content-Type")
|
||||
if ctype != "application/json" {
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype)))
|
||||
return false
|
||||
}
|
||||
b, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
|
||||
return false
|
||||
}
|
||||
if err := req.UnmarshalJSON(b); err != nil {
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func getID(p string, w http.ResponseWriter) (types.ID, bool) {
|
||||
idStr := trimPrefix(p, membersPrefix)
|
||||
if idStr == "" {
|
||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||
return 0, false
|
||||
}
|
||||
id, err := types.IDFromString(idStr)
|
||||
if err != nil {
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr)))
|
||||
return 0, false
|
||||
}
|
||||
return id, true
|
||||
}
|
||||
|
||||
// getUint64 extracts a uint64 by the given key from a Form. If the key does
|
||||
// not exist in the form, 0 is returned. If the key exists but the value is
|
||||
// badly formed, an error is returned. If multiple values are present only the
|
||||
|
||||
@@ -111,6 +111,11 @@ func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *serverRecorder) UpdateMember(_ context.Context, m etcdserver.Member) error {
|
||||
s.actions = append(s.actions, action{name: "UpdateMember", params: []interface{}{m}})
|
||||
return nil
|
||||
}
|
||||
|
||||
type action struct {
|
||||
name string
|
||||
params []interface{}
|
||||
@@ -136,11 +141,12 @@ type resServer struct {
|
||||
func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) {
|
||||
return rs.res, nil
|
||||
}
|
||||
func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil }
|
||||
func (rs *resServer) Start() {}
|
||||
func (rs *resServer) Stop() {}
|
||||
func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil }
|
||||
func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil }
|
||||
func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil }
|
||||
func (rs *resServer) Start() {}
|
||||
func (rs *resServer) Stop() {}
|
||||
func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil }
|
||||
func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil }
|
||||
func (rs *resServer) UpdateMember(_ context.Context, _ etcdserver.Member) error { return nil }
|
||||
|
||||
func boolp(b bool) *bool { return &b }
|
||||
|
||||
@@ -698,6 +704,48 @@ func TestServeMembersDelete(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestServeMembersUpdate(t *testing.T) {
|
||||
u := mustNewURL(t, path.Join(membersPrefix, "1"))
|
||||
b := []byte(`{"peerURLs":["http://127.0.0.1:1"]}`)
|
||||
req, err := http.NewRequest("PUT", u.String(), bytes.NewReader(b))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
s := &serverRecorder{}
|
||||
h := &membersHandler{
|
||||
server: s,
|
||||
clock: clockwork.NewFakeClock(),
|
||||
clusterInfo: &fakeCluster{id: 1},
|
||||
}
|
||||
rw := httptest.NewRecorder()
|
||||
|
||||
h.ServeHTTP(rw, req)
|
||||
|
||||
wcode := http.StatusNoContent
|
||||
if rw.Code != wcode {
|
||||
t.Errorf("code=%d, want %d", rw.Code, wcode)
|
||||
}
|
||||
|
||||
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
|
||||
wcid := h.clusterInfo.ID().String()
|
||||
if gcid != wcid {
|
||||
t.Errorf("cid = %s, want %s", gcid, wcid)
|
||||
}
|
||||
|
||||
wm := etcdserver.Member{
|
||||
ID: 1,
|
||||
RaftAttributes: etcdserver.RaftAttributes{
|
||||
PeerURLs: []string{"http://127.0.0.1:1"},
|
||||
},
|
||||
}
|
||||
|
||||
wactions := []action{{name: "UpdateMember", params: []interface{}{wm}}}
|
||||
if !reflect.DeepEqual(s.actions, wactions) {
|
||||
t.Errorf("actions = %+v, want %+v", s.actions, wactions)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServeMembersFail(t *testing.T) {
|
||||
tests := []struct {
|
||||
req *http.Request
|
||||
@@ -855,6 +903,104 @@ func TestServeMembersFail(t *testing.T) {
|
||||
},
|
||||
nil,
|
||||
|
||||
http.StatusMethodNotAllowed,
|
||||
},
|
||||
{
|
||||
// parse body error
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(membersPrefix, "0")),
|
||||
Method: "PUT",
|
||||
Body: ioutil.NopCloser(strings.NewReader("bad json")),
|
||||
Header: map[string][]string{"Content-Type": []string{"application/json"}},
|
||||
},
|
||||
&resServer{},
|
||||
|
||||
http.StatusBadRequest,
|
||||
},
|
||||
{
|
||||
// bad content type
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(membersPrefix, "0")),
|
||||
Method: "PUT",
|
||||
Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)),
|
||||
Header: map[string][]string{"Content-Type": []string{"application/bad"}},
|
||||
},
|
||||
&errServer{},
|
||||
|
||||
http.StatusUnsupportedMediaType,
|
||||
},
|
||||
{
|
||||
// bad url
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(membersPrefix, "0")),
|
||||
Method: "PUT",
|
||||
Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://a"]}`)),
|
||||
Header: map[string][]string{"Content-Type": []string{"application/json"}},
|
||||
},
|
||||
&errServer{},
|
||||
|
||||
http.StatusBadRequest,
|
||||
},
|
||||
{
|
||||
// etcdserver.UpdateMember error
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(membersPrefix, "0")),
|
||||
Method: "PUT",
|
||||
Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)),
|
||||
Header: map[string][]string{"Content-Type": []string{"application/json"}},
|
||||
},
|
||||
&errServer{
|
||||
errors.New("blah"),
|
||||
},
|
||||
|
||||
http.StatusInternalServerError,
|
||||
},
|
||||
{
|
||||
// etcdserver.UpdateMember error
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(membersPrefix, "0")),
|
||||
Method: "PUT",
|
||||
Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)),
|
||||
Header: map[string][]string{"Content-Type": []string{"application/json"}},
|
||||
},
|
||||
&errServer{
|
||||
etcdserver.ErrPeerURLexists,
|
||||
},
|
||||
|
||||
http.StatusConflict,
|
||||
},
|
||||
{
|
||||
// etcdserver.UpdateMember error
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(membersPrefix, "0")),
|
||||
Method: "PUT",
|
||||
Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)),
|
||||
Header: map[string][]string{"Content-Type": []string{"application/json"}},
|
||||
},
|
||||
&errServer{
|
||||
etcdserver.ErrIDNotFound,
|
||||
},
|
||||
|
||||
http.StatusNotFound,
|
||||
},
|
||||
{
|
||||
// etcdserver.UpdateMember error with badly formed ID
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(membersPrefix, "bad_id")),
|
||||
Method: "PUT",
|
||||
},
|
||||
nil,
|
||||
|
||||
http.StatusNotFound,
|
||||
},
|
||||
{
|
||||
// etcdserver.UpdateMember with no ID
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, membersPrefix),
|
||||
Method: "PUT",
|
||||
},
|
||||
nil,
|
||||
|
||||
http.StatusMethodNotAllowed,
|
||||
},
|
||||
}
|
||||
@@ -995,6 +1141,43 @@ func TestServeMachines(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetID(t *testing.T) {
|
||||
tests := []struct {
|
||||
path string
|
||||
|
||||
wok bool
|
||||
wid types.ID
|
||||
wcode int
|
||||
}{
|
||||
{
|
||||
"123",
|
||||
true, 0x123, http.StatusOK,
|
||||
},
|
||||
{
|
||||
"bad_id",
|
||||
false, 0, http.StatusNotFound,
|
||||
},
|
||||
{
|
||||
"",
|
||||
false, 0, http.StatusMethodNotAllowed,
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
w := httptest.NewRecorder()
|
||||
id, ok := getID(tt.path, w)
|
||||
if id != tt.wid {
|
||||
t.Errorf("#%d: id = %d, want %d", i, id, tt.wid)
|
||||
}
|
||||
if ok != tt.wok {
|
||||
t.Errorf("#%d: ok = %t, want %t", i, ok, tt.wok)
|
||||
}
|
||||
if w.Code != tt.wcode {
|
||||
t.Errorf("#%d code = %d, want %d", i, w.Code, tt.wcode)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type dummyStats struct {
|
||||
data []byte
|
||||
}
|
||||
|
||||
@@ -79,6 +79,9 @@ func (fs *errServer) AddMember(ctx context.Context, m etcdserver.Member) error {
|
||||
func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error {
|
||||
return fs.err
|
||||
}
|
||||
func (fs *errServer) UpdateMember(ctx context.Context, m etcdserver.Member) error {
|
||||
return fs.err
|
||||
}
|
||||
|
||||
func TestWriteError(t *testing.T) {
|
||||
// nil error should not panic
|
||||
|
||||
@@ -33,6 +33,10 @@ type MemberCreateRequest struct {
|
||||
PeerURLs types.URLs
|
||||
}
|
||||
|
||||
type MemberUpdateRequest struct {
|
||||
MemberCreateRequest
|
||||
}
|
||||
|
||||
func (m *MemberCreateRequest) MarshalJSON() ([]byte, error) {
|
||||
s := struct {
|
||||
PeerURLs []string `json:"peerURLs"`
|
||||
|
||||
@@ -21,6 +21,9 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/stats"
|
||||
@@ -108,12 +111,30 @@ func (h *sendHub) Remove(id types.ID) {
|
||||
delete(h.senders, id)
|
||||
}
|
||||
|
||||
func (h *sendHub) Update(m *Member) {
|
||||
// TODO: return error or just panic?
|
||||
if _, ok := h.senders[m.ID]; !ok {
|
||||
return
|
||||
}
|
||||
peerURL := m.PickPeerURL()
|
||||
u, err := url.Parse(peerURL)
|
||||
if err != nil {
|
||||
log.Panicf("unexpect peer url %s", peerURL)
|
||||
}
|
||||
u.Path = path.Join(u.Path, raftPrefix)
|
||||
s := h.senders[m.ID]
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.u = u.String()
|
||||
}
|
||||
|
||||
type sender struct {
|
||||
u string
|
||||
cid types.ID
|
||||
c *http.Client
|
||||
fs *stats.FollowerStats
|
||||
q chan []byte
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func newSender(u string, cid types.ID, c *http.Client, fs *stats.FollowerStats) *sender {
|
||||
@@ -159,7 +180,9 @@ func (s *sender) handle() {
|
||||
// post POSTs a data payload to a url. Returns nil if the POST succeeds,
|
||||
// error on any failure.
|
||||
func (s *sender) post(data []byte) error {
|
||||
s.mu.RLock()
|
||||
req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
|
||||
s.mu.RUnlock()
|
||||
if err != nil {
|
||||
return fmt.Errorf("new request to %s error: %v", s.u, err)
|
||||
}
|
||||
|
||||
@@ -89,6 +89,7 @@ type Sender interface {
|
||||
Send(m []raftpb.Message)
|
||||
Add(m *Member)
|
||||
Remove(id types.ID)
|
||||
Update(m *Member)
|
||||
Stop()
|
||||
}
|
||||
|
||||
@@ -114,7 +115,7 @@ type Server interface {
|
||||
// Stop terminates the Server and performs any necessary finalization.
|
||||
// Do and Process cannot be called after Stop has been invoked.
|
||||
Stop()
|
||||
// Do takes a request and attempts to fulfil it, returning a Response.
|
||||
// Do takes a request and attempts to fulfill it, returning a Response.
|
||||
Do(ctx context.Context, r pb.Request) (Response, error)
|
||||
// Process takes a raft message and applies it to the server's raft state
|
||||
// machine, respecting any timeout of the given context.
|
||||
@@ -127,6 +128,10 @@ type Server interface {
|
||||
// return ErrIDRemoved if member ID is removed from the cluster, or return
|
||||
// ErrIDNotFound if member ID is not in the cluster.
|
||||
RemoveMember(ctx context.Context, id uint64) error
|
||||
|
||||
// UpdateMember attempts to update a existing member in the cluster. It will
|
||||
// return ErrIDNotFound if the member ID does not exist.
|
||||
UpdateMember(ctx context.Context, updateMemb Member) error
|
||||
}
|
||||
|
||||
type Stats interface {
|
||||
@@ -475,6 +480,20 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
|
||||
return s.configure(ctx, cc)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error {
|
||||
b, err := json.Marshal(memb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cc := raftpb.ConfChange{
|
||||
ID: GenID(),
|
||||
Type: raftpb.ConfChangeUpdateNode,
|
||||
NodeID: uint64(memb.ID),
|
||||
Context: b,
|
||||
}
|
||||
return s.configure(ctx, cc)
|
||||
}
|
||||
|
||||
// Implement the RaftTimer interface
|
||||
func (s *EtcdServer) Index() uint64 {
|
||||
return atomic.LoadUint64(&s.raftIndex)
|
||||
@@ -672,6 +691,17 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error {
|
||||
s.Cluster.RemoveMember(id)
|
||||
s.sender.Remove(id)
|
||||
log.Printf("etcdserver: removed node %s from cluster %s", id, s.Cluster.ID())
|
||||
case raftpb.ConfChangeUpdateNode:
|
||||
m := new(Member)
|
||||
if err := json.Unmarshal(cc.Context, m); err != nil {
|
||||
log.Panicf("unmarshal member should never fail: %v", err)
|
||||
}
|
||||
if cc.NodeID != uint64(m.ID) {
|
||||
log.Panicf("nodeID should always be equal to member ID")
|
||||
}
|
||||
s.Cluster.UpdateMember(m)
|
||||
s.sender.Update(m)
|
||||
log.Printf("etcdserver: update node %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -447,7 +447,7 @@ func TestApplyConfChangeError(t *testing.T) {
|
||||
},
|
||||
{
|
||||
raftpb.ConfChange{
|
||||
Type: raftpb.ConfChangeRemoveNode,
|
||||
Type: raftpb.ConfChangeUpdateNode,
|
||||
NodeID: 4,
|
||||
},
|
||||
ErrIDRemoved,
|
||||
@@ -503,6 +503,7 @@ func (s *fakeSender) Send(msgs []raftpb.Message) {
|
||||
}
|
||||
}
|
||||
func (s *fakeSender) Add(m *Member) {}
|
||||
func (s *fakeSender) Update(m *Member) {}
|
||||
func (s *fakeSender) Remove(id types.ID) {}
|
||||
func (s *fakeSender) Stop() {}
|
||||
|
||||
@@ -1017,6 +1018,41 @@ func TestRemoveMember(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpdateMember tests RemoveMember can propose and perform node update.
|
||||
func TestUpdateMember(t *testing.T) {
|
||||
n := newNodeConfChangeCommitterRecorder()
|
||||
n.readyc <- raft.Ready{
|
||||
SoftState: &raft.SoftState{
|
||||
RaftState: raft.StateLeader,
|
||||
Nodes: []uint64{1234, 2345, 3456},
|
||||
},
|
||||
}
|
||||
cl := newTestCluster([]*Member{{ID: 1234}})
|
||||
s := &EtcdServer{
|
||||
node: n,
|
||||
store: &storeRecorder{},
|
||||
sender: &nopSender{},
|
||||
storage: &storageRecorder{},
|
||||
Cluster: cl,
|
||||
}
|
||||
s.start()
|
||||
wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
|
||||
err := s.UpdateMember(context.TODO(), wm)
|
||||
gaction := n.Action()
|
||||
s.Stop()
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("UpdateMember error: %v", err)
|
||||
}
|
||||
wactions := []action{action{name: "ProposeConfChange:ConfChangeUpdateNode"}, action{name: "ApplyConfChange:ConfChangeUpdateNode"}}
|
||||
if !reflect.DeepEqual(gaction, wactions) {
|
||||
t.Errorf("action = %v, want %v", gaction, wactions)
|
||||
}
|
||||
if !reflect.DeepEqual(cl.Member(1234), &wm) {
|
||||
t.Errorf("member = %v, want %v", cl.Member(1234), &wm)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: test server could stop itself when being removed
|
||||
|
||||
// TODO: test wait trigger correctness in multi-server case
|
||||
@@ -1446,6 +1482,7 @@ type nopSender struct{}
|
||||
func (s *nopSender) Send(m []raftpb.Message) {}
|
||||
func (s *nopSender) Add(m *Member) {}
|
||||
func (s *nopSender) Remove(id types.ID) {}
|
||||
func (s *nopSender) Update(m *Member) {}
|
||||
func (s *nopSender) Stop() {}
|
||||
|
||||
func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
|
||||
|
||||
@@ -271,6 +271,8 @@ func (n *node) run(r *raft) {
|
||||
r.addNode(cc.NodeID)
|
||||
case pb.ConfChangeRemoveNode:
|
||||
r.removeNode(cc.NodeID)
|
||||
case pb.ConfChangeUpdateNode:
|
||||
r.resetPendingConf()
|
||||
default:
|
||||
panic("unexpected conf type")
|
||||
}
|
||||
|
||||
@@ -120,15 +120,18 @@ type ConfChangeType int32
|
||||
const (
|
||||
ConfChangeAddNode ConfChangeType = 0
|
||||
ConfChangeRemoveNode ConfChangeType = 1
|
||||
ConfChangeUpdateNode ConfChangeType = 2
|
||||
)
|
||||
|
||||
var ConfChangeType_name = map[int32]string{
|
||||
0: "ConfChangeAddNode",
|
||||
1: "ConfChangeRemoveNode",
|
||||
2: "ConfChangeUpdateNode",
|
||||
}
|
||||
var ConfChangeType_value = map[string]int32{
|
||||
"ConfChangeAddNode": 0,
|
||||
"ConfChangeRemoveNode": 1,
|
||||
"ConfChangeUpdateNode": 2,
|
||||
}
|
||||
|
||||
func (x ConfChangeType) Enum() *ConfChangeType {
|
||||
|
||||
@@ -60,6 +60,7 @@ message HardState {
|
||||
enum ConfChangeType {
|
||||
ConfChangeAddNode = 0;
|
||||
ConfChangeRemoveNode = 1;
|
||||
ConfChangeUpdateNode = 2;
|
||||
}
|
||||
|
||||
message ConfChange {
|
||||
|
||||
Reference in New Issue
Block a user