From 98406af448ba08f14837e82d7528acf7f946e588 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 5 Nov 2014 15:56:43 -0800 Subject: [PATCH 1/2] cluster: separate out membersFromStore from newClusterFromStore --- etcdserver/cluster.go | 59 +++++++++++++++++++++++-------------------- etcdserver/member.go | 1 + 2 files changed, 33 insertions(+), 27 deletions(-) diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 2d76ac3c2..baa6e7d28 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -89,33 +89,7 @@ func NewClusterFromString(token string, cluster string) (*Cluster, error) { func NewClusterFromStore(token string, st store.Store) *Cluster { c := newCluster(token) c.store = st - - e, err := c.store.Get(storeMembersPrefix, true, true) - if err != nil { - if isKeyNotFound(err) { - return c - } - log.Panicf("get storeMembers should never fail: %v", err) - } - for _, n := range e.Node.Nodes { - m, err := nodeToMember(n) - if err != nil { - log.Panicf("nodeToMember should never fail: %v", err) - } - c.members[m.ID] = m - } - - e, err = c.store.Get(storeRemovedMembersPrefix, true, true) - if err != nil { - if isKeyNotFound(err) { - return c - } - log.Panicf("get storeRemovedMembers should never fail: %v", err) - } - for _, n := range e.Node.Nodes { - c.removed[mustParseMemberIDFromKey(n.Key)] = true - } - + c.members, c.removed = membersFromStore(c.store) return c } @@ -322,6 +296,37 @@ func nodeToMember(n *store.NodeExtern) (*Member, error) { return m, nil } +func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) { + members := make(map[types.ID]*Member) + removed := make(map[types.ID]bool) + e, err := st.Get(storeMembersPrefix, true, true) + if err != nil { + if isKeyNotFound(err) { + return members, removed + } + log.Panicf("get storeMembers should never fail: %v", err) + } + for _, n := range e.Node.Nodes { + m, err := nodeToMember(n) + if err != nil { + log.Panicf("nodeToMember should never fail: %v", err) + } + members[m.ID] = m + } + + e, err = st.Get(storeRemovedMembersPrefix, true, true) + if err != nil { + if isKeyNotFound(err) { + return members, removed + } + log.Panicf("get storeRemovedMembers should never fail: %v", err) + } + for _, n := range e.Node.Nodes { + removed[mustParseMemberIDFromKey(n.Key)] = true + } + return members, removed +} + func isKeyNotFound(err error) bool { e, ok := err.(*etcdErr.Error) return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound diff --git a/etcdserver/member.go b/etcdserver/member.go index cce6a5a58..39d7954a9 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -45,6 +45,7 @@ type Member struct { ID types.ID `json:"id"` RaftAttributes Attributes + verified bool } // newMember creates a Member without an ID and generates one based on the From 99b1af40c62773b4a74f53e2433416256320859b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 5 Nov 2014 17:16:42 -0800 Subject: [PATCH 2/2] etcdserver: move config validation to cluster --- etcdserver/cluster.go | 22 ++++++++++++++++++++++ etcdserver/cluster_test.go | 9 ++++----- etcdserver/member.go | 1 - etcdserver/server.go | 29 +++++------------------------ etcdserver/server_test.go | 30 ++++++++++++++++++------------ 5 files changed, 49 insertions(+), 42 deletions(-) diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index baa6e7d28..ca35b3b89 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -31,6 +31,7 @@ import ( etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/pkg/flags" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/store" ) @@ -239,6 +240,27 @@ func (c *Cluster) SetID(id types.ID) { c.id = id } func (c *Cluster) SetStore(st store.Store) { c.store = st } +func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { + appliedMembers, appliedRemoved := membersFromStore(c.store) + + if appliedRemoved[types.ID(cc.NodeID)] { + return ErrIDRemoved + } + switch cc.Type { + case raftpb.ConfChangeAddNode: + if appliedMembers[types.ID(cc.NodeID)] != nil { + return ErrIDExists + } + case raftpb.ConfChangeRemoveNode: + if appliedMembers[types.ID(cc.NodeID)] == nil { + return ErrIDNotFound + } + default: + log.Panicf("ConfChange type should be either AddNode or RemoveNode") + } + return nil +} + // AddMember puts a new Member into the store. // A Member with a matching id must not exist. func (c *Cluster) AddMember(m *Member) { diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 7d016b196..96ea42cbf 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -93,13 +93,11 @@ func TestClusterFromStore(t *testing.T) { }, } for i, tt := range tests { - st := store.New() hc := newTestCluster(nil) - hc.SetStore(st) for _, m := range tt.mems { hc.AddMember(&m) } - c := NewClusterFromStore("abc", st) + c := NewClusterFromStore("abc", hc.store) if c.token != "abc" { t.Errorf("#%d: token = %v, want %v", i, c.token, "abc") } @@ -535,8 +533,9 @@ func TestNodeToMember(t *testing.T) { func newTestCluster(membs []Member) *Cluster { c := &Cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)} - for i, m := range membs { - c.members[m.ID] = &membs[i] + c.store = store.New() + for i := range membs { + c.AddMember(&membs[i]) } return c } diff --git a/etcdserver/member.go b/etcdserver/member.go index 39d7954a9..cce6a5a58 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -45,7 +45,6 @@ type Member struct { ID types.ID `json:"id"` RaftAttributes Attributes - verified bool } // newMember creates a Member without an ID and generates one based on the diff --git a/etcdserver/server.go b/etcdserver/server.go index 584f269ee..0dbe65a3a 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -328,7 +328,7 @@ func (s *EtcdServer) run() { // race them. // TODO: apply configuration change into ClusterStore. if len(rd.CommittedEntries) != 0 { - appliedi = s.apply(rd.CommittedEntries, nodes) + appliedi = s.apply(rd.CommittedEntries) } if rd.Snapshot.Index > snapi { @@ -559,7 +559,7 @@ func getExpirationTime(r *pb.Request) time.Time { return t } -func (s *EtcdServer) apply(es []raftpb.Entry, nodes []uint64) uint64 { +func (s *EtcdServer) apply(es []raftpb.Entry) uint64 { var applied uint64 for i := range es { e := es[i] @@ -571,7 +571,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, nodes []uint64) uint64 { case raftpb.EntryConfChange: var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) - s.w.Trigger(cc.ID, s.applyConfChange(cc, nodes)) + s.w.Trigger(cc.ID, s.applyConfChange(cc)) default: log.Panicf("entry type should be either EntryNormal or EntryConfChange") } @@ -633,8 +633,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { } } -func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error { - if err := s.checkConfChange(cc, nodes); err != nil { +func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { + if err := s.Cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.node.ApplyConfChange(cc) return err @@ -659,25 +659,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error return nil } -func (s *EtcdServer) checkConfChange(cc raftpb.ConfChange, nodes []uint64) error { - if s.Cluster.IsIDRemoved(types.ID(cc.NodeID)) { - return ErrIDRemoved - } - switch cc.Type { - case raftpb.ConfChangeAddNode: - if containsUint64(nodes, cc.NodeID) { - return ErrIDExists - } - case raftpb.ConfChangeRemoveNode: - if !containsUint64(nodes, cc.NodeID) { - return ErrIDNotFound - } - default: - log.Panicf("ConfChange type should be either AddNode or RemoveNode") - } - return nil -} - // TODO: non-blocking snapshot func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) { d, err := s.store.Save() diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index d621b86ce..995bce519 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -421,8 +421,13 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { // TODO: test ErrIDRemoved func TestApplyConfChangeError(t *testing.T) { - nodes := []uint64{1, 2, 3} - removed := map[types.ID]bool{4: true} + cl := newCluster("") + cl.SetStore(store.New()) + for i := 1; i <= 4; i++ { + cl.AddMember(&Member{ID: types.ID(i)}) + } + cl.RemoveMember(4) + tests := []struct { cc raftpb.ConfChange werr error @@ -458,12 +463,11 @@ func TestApplyConfChangeError(t *testing.T) { } for i, tt := range tests { n := &nodeRecorder{} - cl := &Cluster{removed: removed} srv := &EtcdServer{ node: n, Cluster: cl, } - err := srv.applyConfChange(tt.cc, nodes) + err := srv.applyConfChange(tt.cc) if err != tt.werr { t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr) } @@ -506,11 +510,12 @@ func testServer(t *testing.T, ns uint64) { n := raft.StartNode(id, members, 10, 1) tk := time.NewTicker(10 * time.Millisecond) defer tk.Stop() + st := store.New() cl := newCluster("abc") - cl.SetStore(&storeRecorder{}) + cl.SetStore(st) srv := &EtcdServer{ node: n, - store: store.New(), + store: st, send: send, storage: &storageRecorder{}, Ticker: tk.C, @@ -536,8 +541,8 @@ func testServer(t *testing.T, ns uint64) { g, w := resp.Event.Node, &store.NodeExtern{ Key: "/foo", - ModifiedIndex: uint64(i), - CreatedIndex: uint64(i), + ModifiedIndex: uint64(i) + 2*ns, + CreatedIndex: uint64(i) + 2*ns, Value: stringp("bar"), } @@ -576,7 +581,7 @@ func TestDoProposal(t *testing.T) { // this makes <-tk always successful, which accelerates internal clock close(tk) cl := newCluster("abc") - cl.SetStore(&storeRecorder{}) + cl.SetStore(store.New()) srv := &EtcdServer{ node: n, store: st, @@ -833,13 +838,15 @@ func TestTriggerSnap(t *testing.T) { n.Campaign(ctx) st := &storeRecorder{} p := &storageRecorder{} + cl := newCluster("abc") + cl.SetStore(store.New()) s := &EtcdServer{ store: st, send: func(_ []raftpb.Message) {}, storage: p, node: n, snapCount: 10, - Cluster: &Cluster{}, + Cluster: cl, } s.start() @@ -928,7 +935,7 @@ func TestAddMember(t *testing.T) { }, } cl := newTestCluster(nil) - cl.SetStore(&storeRecorder{}) + cl.SetStore(store.New()) s := &EtcdServer{ node: n, store: &storeRecorder{}, @@ -964,7 +971,6 @@ func TestRemoveMember(t *testing.T) { }, } cl := newTestCluster([]Member{{ID: 1234}}) - cl.SetStore(&storeRecorder{}) s := &EtcdServer{ node: n, store: &storeRecorder{},