diff --git a/etcdserver/cluster_store.go b/etcdserver/cluster_store.go index c36655070..3835e237f 100644 --- a/etcdserver/cluster_store.go +++ b/etcdserver/cluster_store.go @@ -43,6 +43,7 @@ type ClusterStore interface { Add(m Member) Get() Cluster Remove(id uint64) + IsRemoved(id uint64) bool } type clusterStore struct { @@ -121,10 +122,26 @@ func nodeToMember(n *store.NodeExtern) (Member, error) { // Remove removes a member from the store. // The given id MUST exist. func (s *clusterStore) Remove(id uint64) { - p := s.Get().FindID(id).storeKey() - if _, err := s.Store.Delete(p, true, true); err != nil { + if _, err := s.Store.Delete(Member{ID: id}.storeKey(), true, true); err != nil { log.Panicf("delete peer should never fail: %v", err) } + if _, err := s.Store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil { + log.Panicf("unexpected creating removed member error: %v", err) + } +} + +func (s *clusterStore) IsRemoved(id uint64) bool { + _, err := s.Store.Get(removedMemberStoreKey(id), false, false) + switch v := err.(type) { + case nil: + return true + case *etcdErr.Error: + if v.ErrorCode == etcdErr.EcodeKeyNotFound { + return false + } + } + log.Panicf("unexpected getting removed member error: %v", err) + return false } // Sender creates the default production sender used to transport raft messages @@ -206,6 +223,10 @@ func httpPost(c *http.Client, url string, cid uint64, data []byte) bool { // TODO: shutdown the etcdserver gracefully? log.Panicf("clusterID mismatch") return false + case http.StatusForbidden: + // TODO: stop the server + log.Panicf("the member has been removed") + return false case http.StatusNoContent: return true default: diff --git a/etcdserver/cluster_store_test.go b/etcdserver/cluster_store_test.go index 202a4f13b..35720d18d 100644 --- a/etcdserver/cluster_store_test.go +++ b/etcdserver/cluster_store_test.go @@ -108,15 +108,39 @@ func TestClusterStoreGet(t *testing.T) { } } -func TestClusterStoreDelete(t *testing.T) { - st := newStoreGetAllAndDeleteRecorder() +func TestClusterStoreRemove(t *testing.T) { + st := &storeRecorder{} cs := &clusterStore{Store: st} - cs.Add(newTestMember(1, nil, "node1", nil)) cs.Remove(1) - wdeletes := []string{path.Join(storeMembersPrefix, "1")} - if !reflect.DeepEqual(st.deletes, wdeletes) { - t.Errorf("deletes = %v, want %v", st.deletes, wdeletes) + wactions := []action{ + {name: "Delete", params: []interface{}{Member{ID: 1}.storeKey(), true, true}}, + {name: "Create", params: []interface{}{removedMemberStoreKey(1), false, "", false, store.Permanent}}, + } + if !reflect.DeepEqual(st.Action(), wactions) { + t.Errorf("actions = %v, want %v", st.Action(), wactions) + } +} + +func TestClusterStoreIsRemovedFalse(t *testing.T) { + st := &errStoreRecorder{err: etcdErr.NewError(etcdErr.EcodeKeyNotFound, "", 0)} + cs := clusterStore{Store: st} + if ok := cs.IsRemoved(1); ok != false { + t.Errorf("IsRemoved = %v, want %v", ok, false) + } +} + +func TestClusterStoreIsRemovedTrue(t *testing.T) { + st := &storeRecorder{} + cs := &clusterStore{Store: st} + if ok := cs.IsRemoved(1); ok != true { + t.Errorf("IsRemoved = %v, want %v", ok, true) + } + wactions := []action{ + {name: "Get", params: []interface{}{removedMemberStoreKey(1), false, false}}, + } + if !reflect.DeepEqual(st.Action(), wactions) { + t.Errorf("actions = %v, want %v", st.Action(), wactions) } } @@ -201,20 +225,6 @@ func newGetAllStore() *getAllStore { return &getAllStore{store.New()} } -type storeGetAllAndDeleteRecorder struct { - *getAllStore - deletes []string -} - -func newStoreGetAllAndDeleteRecorder() *storeGetAllAndDeleteRecorder { - return &storeGetAllAndDeleteRecorder{getAllStore: newGetAllStore()} -} - -func (s *storeGetAllAndDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) { - s.deletes = append(s.deletes, key) - return nil, nil -} - func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member { return Member{ ID: id, diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index b5be1110b..7e63e8f74 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -243,7 +243,12 @@ func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) { } if err := h.server.Process(context.TODO(), m); err != nil { log.Println("etcdhttp: error processing raft message:", err) - writeError(w, err) + switch err { + case etcdserver.ErrRemoved: + http.Error(w, "unexpected message from removed node", http.StatusForbidden) + default: + writeError(w, err) + } return } w.WriteHeader(http.StatusNoContent) diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 659aa71ff..61e1fda59 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -922,7 +922,7 @@ func TestServeRaft(t *testing.T) { http.StatusBadRequest, }, { - // good request, etcdserver.Server error + // good request, etcdserver.Server internal error "POST", bytes.NewReader( mustMarshalMsg( @@ -934,6 +934,19 @@ func TestServeRaft(t *testing.T) { "0", http.StatusInternalServerError, }, + { + // good request from removed member + "POST", + bytes.NewReader( + mustMarshalMsg( + t, + raftpb.Message{}, + ), + ), + etcdserver.ErrRemoved, + "0", + http.StatusForbidden, + }, { // good request "POST", @@ -1654,3 +1667,5 @@ func (c *fakeCluster) Get() etcdserver.Cluster { } func (c *fakeCluster) Remove(id uint64) { return } + +func (c *fakeCluster) IsRemoved(id uint64) bool { return false } diff --git a/etcdserver/member.go b/etcdserver/member.go index d56377dac..3828e32a7 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -79,3 +79,7 @@ func parseMemberID(key string) uint64 { } return id } + +func removedMemberStoreKey(id uint64) string { + return path.Join(storeRemovedMembersPrefix, idAsHex(id)) +} diff --git a/etcdserver/server.go b/etcdserver/server.go index 89a51a745..2a8fe94a2 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -56,11 +56,13 @@ const ( var ( ErrUnknownMethod = errors.New("etcdserver: unknown method") ErrStopped = errors.New("etcdserver: server stopped") + ErrRemoved = errors.New("etcdserver: server removed") ErrIDRemoved = errors.New("etcdserver: ID removed") ErrIDExists = errors.New("etcdserver: ID exists") ErrIDNotFound = errors.New("etcdserver: ID not found") - storeMembersPrefix = path.Join(StoreAdminPrefix, "members") + storeMembersPrefix = path.Join(StoreAdminPrefix, "members") + storeRemovedMembersPrefix = path.Join(StoreAdminPrefix, "removed_members") ) func init() { @@ -265,6 +267,9 @@ func (s *EtcdServer) start() { } func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { + if s.ClusterStore.IsRemoved(m.From) { + return ErrRemoved + } return s.node.Step(ctx, m) } @@ -585,7 +590,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { } func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error { - if err := checkConfChange(cc, nodes); err != nil { + if err := s.checkConfChange(cc, nodes); err != nil { cc.NodeID = raft.None s.node.ApplyConfChange(cc) return err @@ -607,7 +612,10 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error return nil } -func checkConfChange(cc raftpb.ConfChange, nodes []uint64) error { +func (s *EtcdServer) checkConfChange(cc raftpb.ConfChange, nodes []uint64) error { + if s.ClusterStore.IsRemoved(cc.NodeID) { + return ErrIDRemoved + } switch cc.Type { case raftpb.ConfChangeAddNode: if containsUint64(nodes, cc.NodeID) { diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 45858a871..52e517a35 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -386,10 +386,25 @@ func TestApplyRequest(t *testing.T) { // TODO: test ErrIDRemoved func TestApplyConfChangeError(t *testing.T) { nodes := []uint64{1, 2, 3} + removed := map[uint64]bool{4: true} tests := []struct { cc raftpb.ConfChange werr error }{ + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: 4, + }, + ErrIDRemoved, + }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeRemoveNode, + NodeID: 4, + }, + ErrIDRemoved, + }, { raftpb.ConfChange{ Type: raftpb.ConfChangeAddNode, @@ -407,8 +422,10 @@ func TestApplyConfChangeError(t *testing.T) { } for i, tt := range tests { n := &nodeRecorder{} + cs := &removedClusterStore{removed: removed} srv := &EtcdServer{ - node: n, + node: n, + ClusterStore: cs, } err := srv.applyConfChange(tt.cc, nodes) if err != tt.werr { @@ -950,8 +967,8 @@ func TestPublish(t *testing.T) { t.Errorf("method = %s, want PUT", r.Method) } wm := Member{ID: 1, Attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}} - if r.Path != wm.storeKey()+attributesSuffix { - t.Errorf("path = %s, want %s", r.Path, wm.storeKey()+attributesSuffix) + if w := wm.storeKey() + attributesSuffix; r.Path != w { + t.Errorf("path = %s, want %s", r.Path, w) } var gattr Attributes if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil { @@ -1312,6 +1329,16 @@ func (cs *clusterStoreRecorder) Get() Cluster { func (cs *clusterStoreRecorder) Remove(id uint64) { cs.record(action{name: "Remove", params: []interface{}{id}}) } +func (cs *clusterStoreRecorder) IsRemoved(id uint64) bool { return false } + +type removedClusterStore struct { + removed map[uint64]bool +} + +func (cs *removedClusterStore) Add(m Member) {} +func (cs *removedClusterStore) Get() Cluster { return Cluster{} } +func (cs *removedClusterStore) Remove(id uint64) {} +func (cs *removedClusterStore) IsRemoved(id uint64) bool { return cs.removed[id] } func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer { peers := make([]raft.Peer, len(ids))