etcdserver: apply config change on cluster store

This commit is contained in:
Yicheng Qin 2014-09-30 15:14:44 -07:00
parent 2e0fec7a84
commit d051af4d3d
5 changed files with 101 additions and 27 deletions

View File

@ -16,6 +16,7 @@ const (
)
type ClusterStore interface {
Create(m Member)
Get() Cluster
Delete(id int64)
}
@ -27,14 +28,14 @@ type clusterStore struct {
func NewClusterStore(st store.Store, c Cluster) ClusterStore {
cls := &clusterStore{Store: st}
for _, m := range c {
cls.add(*m)
cls.Create(*m)
}
return cls
}
// add puts a new Member into the store.
// Create puts a new Member into the store.
// A Member with a matching id must not exist.
func (s *clusterStore) add(m Member) {
func (s *clusterStore) Create(m Member) {
b, err := json.Marshal(m)
if err != nil {
log.Panicf("marshal peer info error: %v", err)

View File

@ -9,6 +9,28 @@ import (
"github.com/coreos/etcd/store"
)
func TestClusterStoreCreate(t *testing.T) {
st := &storeRecorder{}
ps := &clusterStore{Store: st}
ps.Create(Member{Name: "node", ID: 1})
wactions := []action{
{
name: "Create",
params: []interface{}{
machineKVPrefix + "1",
false,
`{"ID":1,"Name":"node","PeerURLs":null,"ClientURLs":null}`,
false,
store.Permanent,
},
},
}
if g := st.Action(); !reflect.DeepEqual(g, wactions) {
t.Error("actions = %v, want %v", g, wactions)
}
}
func TestClusterStoreGet(t *testing.T) {
tests := []struct {
mems []Member

View File

@ -1238,6 +1238,8 @@ type fakeCluster struct {
members []etcdserver.Member
}
func (c *fakeCluster) Create(m etcdserver.Member) { return }
func (c *fakeCluster) Get() etcdserver.Cluster {
cl := &etcdserver.Cluster{}
cl.AddSlice(c.members)

View File

@ -250,13 +250,13 @@ func (s *EtcdServer) run() {
if err := r.Unmarshal(e.Data); err != nil {
panic("TODO: this is bad, what do we do about it?")
}
s.w.Trigger(r.ID, s.apply(r))
s.w.Trigger(r.ID, s.applyRequest(r))
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(e.Data); err != nil {
panic("TODO: this is bad, what do we do about it?")
}
s.node.ApplyConfChange(cc)
s.applyConfChange(cc)
s.w.Trigger(cc.ID, nil)
default:
panic("unexpected entry type")
@ -360,17 +360,21 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
}
}
func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error {
func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
b, err := json.Marshal(memb)
if err != nil {
return err
}
cc := raftpb.ConfChange{
ID: GenID(),
Type: raftpb.ConfChangeAddNode,
NodeID: id,
Context: context,
NodeID: memb.ID,
Context: b,
}
return s.configure(ctx, cc)
}
func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error {
func (s *EtcdServer) RemoveMember(ctx context.Context, id int64) error {
cc := raftpb.ConfChange{
ID: GenID(),
Type: raftpb.ConfChangeRemoveNode,
@ -477,9 +481,9 @@ func getExpirationTime(r *pb.Request) time.Time {
return t
}
// apply interprets r as a call to store.X and returns a Response interpreted
// applyRequest interprets r as a call to store.X and returns a Response interpreted
// from store.Event
func (s *EtcdServer) apply(r pb.Request) Response {
func (s *EtcdServer) applyRequest(r pb.Request) Response {
f := func(ev *store.Event, err error) Response {
return Response{Event: ev, err: err}
}
@ -518,6 +522,22 @@ func (s *EtcdServer) apply(r pb.Request) Response {
}
}
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) {
s.node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
var m Member
if err := json.Unmarshal(cc.Context, &m); err != nil {
panic("unexpected unmarshal error")
}
s.ClusterStore.Create(m)
case raftpb.ConfChangeRemoveNode:
s.ClusterStore.Delete(cc.NodeID)
default:
panic("unexpected ConfChange type")
}
}
// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi int64, snapnodes []int64) {
d, err := s.store.Save()

View File

@ -128,7 +128,7 @@ func TestDoBadLocalAction(t *testing.T) {
}
}
func TestApply(t *testing.T) {
func TestApplyRequest(t *testing.T) {
tests := []struct {
req pb.Request
@ -356,7 +356,7 @@ func TestApply(t *testing.T) {
for i, tt := range tests {
st := &storeRecorder{}
srv := &EtcdServer{store: st}
resp := srv.apply(tt.req)
resp := srv.applyRequest(tt.req)
if !reflect.DeepEqual(resp, tt.wresp) {
t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
@ -786,17 +786,20 @@ func TestRecvSlowSnapshot(t *testing.T) {
}
}
// TestAddNode tests AddNode can propose and perform node addition.
func TestAddNode(t *testing.T) {
// TestAddMember tests AddMember can propose and perform node addition.
func TestAddMember(t *testing.T) {
n := newNodeConfChangeCommitterRecorder()
cs := &clusterStoreRecorder{}
s := &EtcdServer{
node: n,
store: &storeRecorder{},
send: func(_ []raftpb.Message) {},
storage: &storageRecorder{},
node: n,
store: &storeRecorder{},
send: func(_ []raftpb.Message) {},
storage: &storageRecorder{},
ClusterStore: cs,
}
s.start()
s.AddNode(context.TODO(), 1, []byte("foo"))
m := Member{ID: 1, PeerURLs: []string{"foo"}}
s.AddMember(context.TODO(), m)
gaction := n.Action()
s.Stop()
@ -804,19 +807,26 @@ func TestAddNode(t *testing.T) {
if !reflect.DeepEqual(gaction, wactions) {
t.Errorf("action = %v, want %v", gaction, wactions)
}
wcsactions := []action{{name: "Create", params: []interface{}{m}}}
if g := cs.Action(); !reflect.DeepEqual(g, wcsactions) {
t.Errorf("csaction = %v, want %v", g, wcsactions)
}
}
// TestRemoveNode tests RemoveNode can propose and perform node removal.
func TestRemoveNode(t *testing.T) {
// TestRemoveMember tests RemoveMember can propose and perform node removal.
func TestRemoveMember(t *testing.T) {
n := newNodeConfChangeCommitterRecorder()
cs := &clusterStoreRecorder{}
s := &EtcdServer{
node: n,
store: &storeRecorder{},
send: func(_ []raftpb.Message) {},
storage: &storageRecorder{},
node: n,
store: &storeRecorder{},
send: func(_ []raftpb.Message) {},
storage: &storageRecorder{},
ClusterStore: cs,
}
s.start()
s.RemoveNode(context.TODO(), 1)
id := int64(1)
s.RemoveMember(context.TODO(), id)
gaction := n.Action()
s.Stop()
@ -824,6 +834,10 @@ func TestRemoveNode(t *testing.T) {
if !reflect.DeepEqual(gaction, wactions) {
t.Errorf("action = %v, want %v", gaction, wactions)
}
wcsactions := []action{{name: "Delete", params: []interface{}{id}}}
if g := cs.Action(); !reflect.DeepEqual(g, wcsactions) {
t.Errorf("csaction = %v, want %v", g, wcsactions)
}
}
// TestServerStopItself tests that if node sends out Ready with ShouldStop,
@ -1230,6 +1244,21 @@ func (w *waitWithResponse) Register(id int64) <-chan interface{} {
}
func (w *waitWithResponse) Trigger(id int64, x interface{}) {}
type clusterStoreRecorder struct {
recorder
}
func (cs *clusterStoreRecorder) Create(m Member) {
cs.record(action{name: "Create", params: []interface{}{m}})
}
func (cs *clusterStoreRecorder) Get() Cluster {
cs.record(action{name: "Get"})
return nil
}
func (cs *clusterStoreRecorder) Delete(id int64) {
cs.record(action{name: "Delete", params: []interface{}{id}})
}
func mustClusterStore(t *testing.T, membs []Member) ClusterStore {
c := Cluster{}
if err := c.AddSlice(membs); err != nil {