etcdserver: record removed member to check incoming message

This commit is contained in:
Yicheng Qin 2014-10-21 11:04:42 -07:00
parent b40d30a8d2
commit 7498234e40
7 changed files with 120 additions and 30 deletions

View File

@ -43,6 +43,7 @@ type ClusterStore interface {
Add(m Member)
Get() Cluster
Remove(id uint64)
IsRemoved(id uint64) bool
}
type clusterStore struct {
@ -121,10 +122,26 @@ func nodeToMember(n *store.NodeExtern) (Member, error) {
// Remove removes a member from the store.
// The given id MUST exist.
func (s *clusterStore) Remove(id uint64) {
p := s.Get().FindID(id).storeKey()
if _, err := s.Store.Delete(p, true, true); err != nil {
if _, err := s.Store.Delete(Member{ID: id}.storeKey(), true, true); err != nil {
log.Panicf("delete peer should never fail: %v", err)
}
if _, err := s.Store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
log.Panicf("unexpected creating removed member error: %v", err)
}
}
func (s *clusterStore) IsRemoved(id uint64) bool {
_, err := s.Store.Get(removedMemberStoreKey(id), false, false)
switch v := err.(type) {
case nil:
return true
case *etcdErr.Error:
if v.ErrorCode == etcdErr.EcodeKeyNotFound {
return false
}
}
log.Panicf("unexpected getting removed member error: %v", err)
return false
}
// Sender creates the default production sender used to transport raft messages
@ -206,6 +223,10 @@ func httpPost(c *http.Client, url string, cid uint64, data []byte) bool {
// TODO: shutdown the etcdserver gracefully?
log.Panicf("clusterID mismatch")
return false
case http.StatusForbidden:
// TODO: stop the server
log.Panicf("the member has been removed")
return false
case http.StatusNoContent:
return true
default:

View File

@ -108,15 +108,39 @@ func TestClusterStoreGet(t *testing.T) {
}
}
func TestClusterStoreDelete(t *testing.T) {
st := newStoreGetAllAndDeleteRecorder()
func TestClusterStoreRemove(t *testing.T) {
st := &storeRecorder{}
cs := &clusterStore{Store: st}
cs.Add(newTestMember(1, nil, "node1", nil))
cs.Remove(1)
wdeletes := []string{path.Join(storeMembersPrefix, "1")}
if !reflect.DeepEqual(st.deletes, wdeletes) {
t.Errorf("deletes = %v, want %v", st.deletes, wdeletes)
wactions := []action{
{name: "Delete", params: []interface{}{Member{ID: 1}.storeKey(), true, true}},
{name: "Create", params: []interface{}{removedMemberStoreKey(1), false, "", false, store.Permanent}},
}
if !reflect.DeepEqual(st.Action(), wactions) {
t.Errorf("actions = %v, want %v", st.Action(), wactions)
}
}
func TestClusterStoreIsRemovedFalse(t *testing.T) {
st := &errStoreRecorder{err: etcdErr.NewError(etcdErr.EcodeKeyNotFound, "", 0)}
cs := clusterStore{Store: st}
if ok := cs.IsRemoved(1); ok != false {
t.Errorf("IsRemoved = %v, want %v", ok, false)
}
}
func TestClusterStoreIsRemovedTrue(t *testing.T) {
st := &storeRecorder{}
cs := &clusterStore{Store: st}
if ok := cs.IsRemoved(1); ok != true {
t.Errorf("IsRemoved = %v, want %v", ok, true)
}
wactions := []action{
{name: "Get", params: []interface{}{removedMemberStoreKey(1), false, false}},
}
if !reflect.DeepEqual(st.Action(), wactions) {
t.Errorf("actions = %v, want %v", st.Action(), wactions)
}
}
@ -201,20 +225,6 @@ func newGetAllStore() *getAllStore {
return &getAllStore{store.New()}
}
type storeGetAllAndDeleteRecorder struct {
*getAllStore
deletes []string
}
func newStoreGetAllAndDeleteRecorder() *storeGetAllAndDeleteRecorder {
return &storeGetAllAndDeleteRecorder{getAllStore: newGetAllStore()}
}
func (s *storeGetAllAndDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) {
s.deletes = append(s.deletes, key)
return nil, nil
}
func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member {
return Member{
ID: id,

View File

@ -243,7 +243,12 @@ func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
}
if err := h.server.Process(context.TODO(), m); err != nil {
log.Println("etcdhttp: error processing raft message:", err)
writeError(w, err)
switch err {
case etcdserver.ErrRemoved:
http.Error(w, "unexpected message from removed node", http.StatusForbidden)
default:
writeError(w, err)
}
return
}
w.WriteHeader(http.StatusNoContent)

View File

@ -922,7 +922,7 @@ func TestServeRaft(t *testing.T) {
http.StatusBadRequest,
},
{
// good request, etcdserver.Server error
// good request, etcdserver.Server internal error
"POST",
bytes.NewReader(
mustMarshalMsg(
@ -934,6 +934,19 @@ func TestServeRaft(t *testing.T) {
"0",
http.StatusInternalServerError,
},
{
// good request from removed member
"POST",
bytes.NewReader(
mustMarshalMsg(
t,
raftpb.Message{},
),
),
etcdserver.ErrRemoved,
"0",
http.StatusForbidden,
},
{
// good request
"POST",
@ -1654,3 +1667,5 @@ func (c *fakeCluster) Get() etcdserver.Cluster {
}
func (c *fakeCluster) Remove(id uint64) { return }
func (c *fakeCluster) IsRemoved(id uint64) bool { return false }

View File

@ -79,3 +79,7 @@ func parseMemberID(key string) uint64 {
}
return id
}
func removedMemberStoreKey(id uint64) string {
return path.Join(storeRemovedMembersPrefix, idAsHex(id))
}

View File

@ -56,11 +56,13 @@ const (
var (
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
ErrRemoved = errors.New("etcdserver: server removed")
ErrIDRemoved = errors.New("etcdserver: ID removed")
ErrIDExists = errors.New("etcdserver: ID exists")
ErrIDNotFound = errors.New("etcdserver: ID not found")
storeMembersPrefix = path.Join(StoreAdminPrefix, "members")
storeMembersPrefix = path.Join(StoreAdminPrefix, "members")
storeRemovedMembersPrefix = path.Join(StoreAdminPrefix, "removed_members")
)
func init() {
@ -265,6 +267,9 @@ func (s *EtcdServer) start() {
}
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.ClusterStore.IsRemoved(m.From) {
return ErrRemoved
}
return s.node.Step(ctx, m)
}
@ -585,7 +590,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
}
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error {
if err := checkConfChange(cc, nodes); err != nil {
if err := s.checkConfChange(cc, nodes); err != nil {
cc.NodeID = raft.None
s.node.ApplyConfChange(cc)
return err
@ -607,7 +612,10 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error
return nil
}
func checkConfChange(cc raftpb.ConfChange, nodes []uint64) error {
func (s *EtcdServer) checkConfChange(cc raftpb.ConfChange, nodes []uint64) error {
if s.ClusterStore.IsRemoved(cc.NodeID) {
return ErrIDRemoved
}
switch cc.Type {
case raftpb.ConfChangeAddNode:
if containsUint64(nodes, cc.NodeID) {

View File

@ -386,10 +386,25 @@ func TestApplyRequest(t *testing.T) {
// TODO: test ErrIDRemoved
func TestApplyConfChangeError(t *testing.T) {
nodes := []uint64{1, 2, 3}
removed := map[uint64]bool{4: true}
tests := []struct {
cc raftpb.ConfChange
werr error
}{
{
raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: 4,
},
ErrIDRemoved,
},
{
raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: 4,
},
ErrIDRemoved,
},
{
raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
@ -407,8 +422,10 @@ func TestApplyConfChangeError(t *testing.T) {
}
for i, tt := range tests {
n := &nodeRecorder{}
cs := &removedClusterStore{removed: removed}
srv := &EtcdServer{
node: n,
node: n,
ClusterStore: cs,
}
err := srv.applyConfChange(tt.cc, nodes)
if err != tt.werr {
@ -950,8 +967,8 @@ func TestPublish(t *testing.T) {
t.Errorf("method = %s, want PUT", r.Method)
}
wm := Member{ID: 1, Attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
if r.Path != wm.storeKey()+attributesSuffix {
t.Errorf("path = %s, want %s", r.Path, wm.storeKey()+attributesSuffix)
if w := wm.storeKey() + attributesSuffix; r.Path != w {
t.Errorf("path = %s, want %s", r.Path, w)
}
var gattr Attributes
if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
@ -1312,6 +1329,16 @@ func (cs *clusterStoreRecorder) Get() Cluster {
func (cs *clusterStoreRecorder) Remove(id uint64) {
cs.record(action{name: "Remove", params: []interface{}{id}})
}
func (cs *clusterStoreRecorder) IsRemoved(id uint64) bool { return false }
type removedClusterStore struct {
removed map[uint64]bool
}
func (cs *removedClusterStore) Add(m Member) {}
func (cs *removedClusterStore) Get() Cluster { return Cluster{} }
func (cs *removedClusterStore) Remove(id uint64) {}
func (cs *removedClusterStore) IsRemoved(id uint64) bool { return cs.removed[id] }
func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
peers := make([]raft.Peer, len(ids))