From d051af4d3d1a70a9c1737f1af1e3fb3c21f0d204 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 30 Sep 2014 15:14:44 -0700 Subject: [PATCH] etcdserver: apply config change on cluster store --- etcdserver/cluster_store.go | 7 ++-- etcdserver/cluster_store_test.go | 22 ++++++++++++ etcdserver/etcdhttp/http_test.go | 2 ++ etcdserver/server.go | 36 ++++++++++++++----- etcdserver/server_test.go | 61 +++++++++++++++++++++++--------- 5 files changed, 101 insertions(+), 27 deletions(-) diff --git a/etcdserver/cluster_store.go b/etcdserver/cluster_store.go index f925c5d1a..bd52006dd 100644 --- a/etcdserver/cluster_store.go +++ b/etcdserver/cluster_store.go @@ -16,6 +16,7 @@ const ( ) type ClusterStore interface { + Create(m Member) Get() Cluster Delete(id int64) } @@ -27,14 +28,14 @@ type clusterStore struct { func NewClusterStore(st store.Store, c Cluster) ClusterStore { cls := &clusterStore{Store: st} for _, m := range c { - cls.add(*m) + cls.Create(*m) } return cls } -// add puts a new Member into the store. +// Create puts a new Member into the store. // A Member with a matching id must not exist. -func (s *clusterStore) add(m Member) { +func (s *clusterStore) Create(m Member) { b, err := json.Marshal(m) if err != nil { log.Panicf("marshal peer info error: %v", err) diff --git a/etcdserver/cluster_store_test.go b/etcdserver/cluster_store_test.go index aeec05e85..328be8984 100644 --- a/etcdserver/cluster_store_test.go +++ b/etcdserver/cluster_store_test.go @@ -9,6 +9,28 @@ import ( "github.com/coreos/etcd/store" ) +func TestClusterStoreCreate(t *testing.T) { + st := &storeRecorder{} + ps := &clusterStore{Store: st} + ps.Create(Member{Name: "node", ID: 1}) + + wactions := []action{ + { + name: "Create", + params: []interface{}{ + machineKVPrefix + "1", + false, + `{"ID":1,"Name":"node","PeerURLs":null,"ClientURLs":null}`, + false, + store.Permanent, + }, + }, + } + if g := st.Action(); !reflect.DeepEqual(g, wactions) { + t.Error("actions = %v, want %v", g, wactions) + } +} + func TestClusterStoreGet(t *testing.T) { tests := []struct { mems []Member diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 48fa604f6..bffb8af45 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -1238,6 +1238,8 @@ type fakeCluster struct { members []etcdserver.Member } +func (c *fakeCluster) Create(m etcdserver.Member) { return } + func (c *fakeCluster) Get() etcdserver.Cluster { cl := &etcdserver.Cluster{} cl.AddSlice(c.members) diff --git a/etcdserver/server.go b/etcdserver/server.go index f9995519e..bb0e533fd 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -250,13 +250,13 @@ func (s *EtcdServer) run() { if err := r.Unmarshal(e.Data); err != nil { panic("TODO: this is bad, what do we do about it?") } - s.w.Trigger(r.ID, s.apply(r)) + s.w.Trigger(r.ID, s.applyRequest(r)) case raftpb.EntryConfChange: var cc raftpb.ConfChange if err := cc.Unmarshal(e.Data); err != nil { panic("TODO: this is bad, what do we do about it?") } - s.node.ApplyConfChange(cc) + s.applyConfChange(cc) s.w.Trigger(cc.ID, nil) default: panic("unexpected entry type") @@ -360,17 +360,21 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { } } -func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error { +func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error { + b, err := json.Marshal(memb) + if err != nil { + return err + } cc := raftpb.ConfChange{ ID: GenID(), Type: raftpb.ConfChangeAddNode, - NodeID: id, - Context: context, + NodeID: memb.ID, + Context: b, } return s.configure(ctx, cc) } -func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error { +func (s *EtcdServer) RemoveMember(ctx context.Context, id int64) error { cc := raftpb.ConfChange{ ID: GenID(), Type: raftpb.ConfChangeRemoveNode, @@ -477,9 +481,9 @@ func getExpirationTime(r *pb.Request) time.Time { return t } -// apply interprets r as a call to store.X and returns a Response interpreted +// applyRequest interprets r as a call to store.X and returns a Response interpreted // from store.Event -func (s *EtcdServer) apply(r pb.Request) Response { +func (s *EtcdServer) applyRequest(r pb.Request) Response { f := func(ev *store.Event, err error) Response { return Response{Event: ev, err: err} } @@ -518,6 +522,22 @@ func (s *EtcdServer) apply(r pb.Request) Response { } } +func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) { + s.node.ApplyConfChange(cc) + switch cc.Type { + case raftpb.ConfChangeAddNode: + var m Member + if err := json.Unmarshal(cc.Context, &m); err != nil { + panic("unexpected unmarshal error") + } + s.ClusterStore.Create(m) + case raftpb.ConfChangeRemoveNode: + s.ClusterStore.Delete(cc.NodeID) + default: + panic("unexpected ConfChange type") + } +} + // TODO: non-blocking snapshot func (s *EtcdServer) snapshot(snapi int64, snapnodes []int64) { d, err := s.store.Save() diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 8ac6b99bc..887dad0e8 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -128,7 +128,7 @@ func TestDoBadLocalAction(t *testing.T) { } } -func TestApply(t *testing.T) { +func TestApplyRequest(t *testing.T) { tests := []struct { req pb.Request @@ -356,7 +356,7 @@ func TestApply(t *testing.T) { for i, tt := range tests { st := &storeRecorder{} srv := &EtcdServer{store: st} - resp := srv.apply(tt.req) + resp := srv.applyRequest(tt.req) if !reflect.DeepEqual(resp, tt.wresp) { t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp) @@ -786,17 +786,20 @@ func TestRecvSlowSnapshot(t *testing.T) { } } -// TestAddNode tests AddNode can propose and perform node addition. -func TestAddNode(t *testing.T) { +// TestAddMember tests AddMember can propose and perform node addition. +func TestAddMember(t *testing.T) { n := newNodeConfChangeCommitterRecorder() + cs := &clusterStoreRecorder{} s := &EtcdServer{ - node: n, - store: &storeRecorder{}, - send: func(_ []raftpb.Message) {}, - storage: &storageRecorder{}, + node: n, + store: &storeRecorder{}, + send: func(_ []raftpb.Message) {}, + storage: &storageRecorder{}, + ClusterStore: cs, } s.start() - s.AddNode(context.TODO(), 1, []byte("foo")) + m := Member{ID: 1, PeerURLs: []string{"foo"}} + s.AddMember(context.TODO(), m) gaction := n.Action() s.Stop() @@ -804,19 +807,26 @@ func TestAddNode(t *testing.T) { if !reflect.DeepEqual(gaction, wactions) { t.Errorf("action = %v, want %v", gaction, wactions) } + wcsactions := []action{{name: "Create", params: []interface{}{m}}} + if g := cs.Action(); !reflect.DeepEqual(g, wcsactions) { + t.Errorf("csaction = %v, want %v", g, wcsactions) + } } -// TestRemoveNode tests RemoveNode can propose and perform node removal. -func TestRemoveNode(t *testing.T) { +// TestRemoveMember tests RemoveMember can propose and perform node removal. +func TestRemoveMember(t *testing.T) { n := newNodeConfChangeCommitterRecorder() + cs := &clusterStoreRecorder{} s := &EtcdServer{ - node: n, - store: &storeRecorder{}, - send: func(_ []raftpb.Message) {}, - storage: &storageRecorder{}, + node: n, + store: &storeRecorder{}, + send: func(_ []raftpb.Message) {}, + storage: &storageRecorder{}, + ClusterStore: cs, } s.start() - s.RemoveNode(context.TODO(), 1) + id := int64(1) + s.RemoveMember(context.TODO(), id) gaction := n.Action() s.Stop() @@ -824,6 +834,10 @@ func TestRemoveNode(t *testing.T) { if !reflect.DeepEqual(gaction, wactions) { t.Errorf("action = %v, want %v", gaction, wactions) } + wcsactions := []action{{name: "Delete", params: []interface{}{id}}} + if g := cs.Action(); !reflect.DeepEqual(g, wcsactions) { + t.Errorf("csaction = %v, want %v", g, wcsactions) + } } // TestServerStopItself tests that if node sends out Ready with ShouldStop, @@ -1230,6 +1244,21 @@ func (w *waitWithResponse) Register(id int64) <-chan interface{} { } func (w *waitWithResponse) Trigger(id int64, x interface{}) {} +type clusterStoreRecorder struct { + recorder +} + +func (cs *clusterStoreRecorder) Create(m Member) { + cs.record(action{name: "Create", params: []interface{}{m}}) +} +func (cs *clusterStoreRecorder) Get() Cluster { + cs.record(action{name: "Get"}) + return nil +} +func (cs *clusterStoreRecorder) Delete(id int64) { + cs.record(action{name: "Delete", params: []interface{}{id}}) +} + func mustClusterStore(t *testing.T, membs []Member) ClusterStore { c := Cluster{} if err := c.AddSlice(membs); err != nil {