From aaffb9eb78cd6cd75947bf57ba397abfa3932964 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 19 Sep 2014 12:36:19 -0700 Subject: [PATCH] etcdserver: add AddNode, RemoveNode AddNode and RemoveNode is used to propose config change to the cluster. If succeeds, it will add/remove node from the cluster. --- etcdserver/etcdserverpb/etcdserver.pb.go | 168 +++++++++++++++++++++++ etcdserver/etcdserverpb/etcdserver.proto | 7 + etcdserver/server.go | 81 ++++++++++- etcdserver/server_test.go | 81 ++++++++++- 4 files changed, 325 insertions(+), 12 deletions(-) diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index a4176a31f..c299e30b3 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -10,6 +10,7 @@ It has these top-level messages: Request + Config */ package etcdserverpb @@ -50,6 +51,18 @@ func (m *Request) Reset() { *m = Request{} } func (m *Request) String() string { return proto.CompactTextString(m) } func (*Request) ProtoMessage() {} +type Config struct { + Id int64 `protobuf:"varint,1,req,name=id" json:"id"` + Type int64 `protobuf:"varint,2,req,name=type" json:"type"` + NodeID int64 `protobuf:"varint,3,req,name=nodeID" json:"nodeID"` + Context []byte `protobuf:"bytes,4,opt,name=context" json:"context"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Config) Reset() { *m = Config{} } +func (m *Config) String() string { return proto.CompactTextString(m) } +func (*Config) ProtoMessage() {} + func init() { } func (m *Request) Unmarshal(data []byte) error { @@ -360,6 +373,115 @@ func (m *Request) Unmarshal(data []byte) error { } return nil } +func (m *Config) Unmarshal(data []byte) error { + l := len(data) + index := 0 + for index < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Id |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Type |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.NodeID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := index + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Context = append(m.Context, data[index:postIndex]...) + index = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + index -= sizeOfWire + skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:]) + if err != nil { + return err + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) + index += skippy + } + } + return nil +} func (m *Request) Size() (n int) { var l int _ = l @@ -389,6 +511,19 @@ func (m *Request) Size() (n int) { } return n } +func (m *Config) Size() (n int) { + var l int + _ = l + n += 1 + sovEtcdserver(uint64(m.Id)) + n += 1 + sovEtcdserver(uint64(m.Type)) + n += 1 + sovEtcdserver(uint64(m.NodeID)) + l = len(m.Context) + n += 1 + l + sovEtcdserver(uint64(l)) + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} func sovEtcdserver(x uint64) (n int) { for { @@ -504,6 +639,39 @@ func (m *Request) MarshalTo(data []byte) (n int, err error) { } return i, nil } +func (m *Config) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Config) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintEtcdserver(data, i, uint64(m.Id)) + data[i] = 0x10 + i++ + i = encodeVarintEtcdserver(data, i, uint64(m.Type)) + data[i] = 0x18 + i++ + i = encodeVarintEtcdserver(data, i, uint64(m.NodeID)) + data[i] = 0x22 + i++ + i = encodeVarintEtcdserver(data, i, uint64(len(m.Context))) + i += copy(data[i:], m.Context) + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index bb34ac250..10ba4cbf4 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -24,3 +24,10 @@ message Request { required bool quorum = 14 [(gogoproto.nullable) = false]; required int64 time = 15 [(gogoproto.nullable) = false]; } + +message Config { + required int64 id = 1 [(gogoproto.nullable) = false]; + required int64 type = 2 [(gogoproto.nullable) = false]; + required int64 nodeID = 3 [(gogoproto.nullable) = false]; + optional bytes context = 4 [(gogoproto.nullable) = false]; +} diff --git a/etcdserver/server.go b/etcdserver/server.go index 905213acd..8a864954f 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -19,6 +19,11 @@ const ( DefaultSnapCount = 10000 ) +const ( + configAddNode int64 = iota + configRemoveNode +) + var ( ErrUnknownMethod = errors.New("etcdserver: unknown method") ErrStopped = errors.New("etcdserver: server stopped") @@ -121,11 +126,23 @@ func (s *EtcdServer) run() { // care to apply entries in a single goroutine, and not // race them. for _, e := range rd.CommittedEntries { - var r pb.Request - if err := r.Unmarshal(e.Data); err != nil { - panic("TODO: this is bad, what do we do about it?") + switch e.Type { + case raft.EntryNormal: + var r pb.Request + if err := r.Unmarshal(e.Data); err != nil { + panic("TODO: this is bad, what do we do about it?") + } + s.w.Trigger(r.Id, s.applyRequest(r)) + case raft.EntryConfig: + var c pb.Config + if err := c.Unmarshal(e.Data); err != nil { + panic("TODO: this is bad, what do we do about it?") + } + s.applyConfig(c) + s.w.Trigger(c.Id, nil) + default: + panic("unsupported entry type") } - s.w.Trigger(r.Id, s.apply(r)) appliedi = e.Index } @@ -218,6 +235,46 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { } } +func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error { + req := pb.Config{ + Id: GenID(), + Type: configAddNode, + NodeID: id, + Context: context, + } + return s.configure(ctx, req) +} + +func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error { + req := pb.Config{ + Id: GenID(), + Type: configRemoveNode, + NodeID: id, + } + return s.configure(ctx, req) +} + +// configure sends configuration change through consensus then performs it. +// It will block until the change is performed or there is an error. +func (s *EtcdServer) configure(ctx context.Context, r pb.Config) error { + data, err := r.Marshal() + if err != nil { + log.Printf("marshal request %#v error: %v", r, err) + return err + } + ch := s.w.Register(r.Id) + s.Node.Configure(ctx, data) + select { + case <-ch: + return nil + case <-ctx.Done(): + s.w.Trigger(r.Id, nil) // GC wait + return ctx.Err() + case <-s.done: + return ErrStopped + } +} + // sync proposes a SYNC request and is non-blocking. // This makes no guarantee that the request will be proposed or performed. // The request will be cancelled after the given timeout. @@ -249,8 +306,8 @@ func getExpirationTime(r *pb.Request) time.Time { return t } -// apply interprets r as a call to store.X and returns an Response interpreted from store.Event -func (s *EtcdServer) apply(r pb.Request) Response { +// applyRequest interprets r as a call to store.X and returns an Response interpreted from store.Event +func (s *EtcdServer) applyRequest(r pb.Request) Response { f := func(ev *store.Event, err error) Response { return Response{Event: ev, err: err} } @@ -290,6 +347,18 @@ func (s *EtcdServer) apply(r pb.Request) Response { } } +func (s *EtcdServer) applyConfig(r pb.Config) { + switch r.Type { + case configAddNode: + s.Node.AddNode(r.NodeID) + case configRemoveNode: + s.Node.RemoveNode(r.NodeID) + default: + // This should never be reached + panic("unsupported config type") + } +} + // TODO: non-blocking snapshot func (s *EtcdServer) snapshot() { d, err := s.Store.Save() diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index ee3039a48..ba48434f4 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -120,7 +120,7 @@ func TestDoBadLocalAction(t *testing.T) { } } -func TestApply(t *testing.T) { +func TestApplyRequest(t *testing.T) { tests := []struct { req pb.Request @@ -188,7 +188,7 @@ func TestApply(t *testing.T) { for i, tt := range tests { st := &storeRecorder{} srv := &EtcdServer{Store: st} - resp := srv.apply(tt.req) + resp := srv.applyRequest(tt.req) if !reflect.DeepEqual(resp, tt.wresp) { t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp) @@ -594,6 +594,46 @@ func TestRecvSlowSnapshot(t *testing.T) { } } +// TestAddNode tests AddNode could propose configuration and add node to raft. +func TestAddNode(t *testing.T) { + n := newNodeCommitterRecorder() + s := &EtcdServer{ + Node: n, + Store: &storeRecorder{}, + Send: func(_ []raftpb.Message) {}, + Storage: &storageRecorder{}, + } + s.Start() + s.AddNode(context.TODO(), 1, []byte("foo")) + action := n.Action() + s.Stop() + + waction := []string{"Configure", "AddNode"} + if !reflect.DeepEqual(action, waction) { + t.Errorf("action = %v, want %v", action, waction) + } +} + +// TestRemoveNode tests RemoveNode could propose configuration and remove node from raft. +func TestRemoveNode(t *testing.T) { + n := newNodeCommitterRecorder() + s := &EtcdServer{ + Node: n, + Store: &storeRecorder{}, + Send: func(_ []raftpb.Message) {}, + Storage: &storageRecorder{}, + } + s.Start() + s.RemoveNode(context.TODO(), 1) + action := n.Action() + s.Stop() + + waction := []string{"Configure", "RemoveNode"} + if !reflect.DeepEqual(action, waction) { + t.Errorf("action = %v, want %v", action, waction) + } +} + // TODO: test wait trigger correctness in multi-server case func TestGetBool(t *testing.T) { @@ -788,20 +828,27 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error { n.record("Propose") return nil } +func (n *nodeRecorder) Configure(ctx context.Context, data []byte) error { + n.record("Configure") + return nil +} func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error { n.record("Step") return nil } -func (n *nodeRecorder) Ready() <-chan raft.Ready { - n.record("Ready") - return nil -} +func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil } func (n *nodeRecorder) Stop() { n.record("Stop") } func (n *nodeRecorder) Compact(d []byte) { n.record("Compact") } +func (n *nodeRecorder) AddNode(id int64) { + n.record("AddNode") +} +func (n *nodeRecorder) RemoveNode(id int64) { + n.record("RemoveNode") +} type nodeProposeDataRecorder struct { nodeRecorder @@ -832,3 +879,25 @@ func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) n.record("Propose blocked") return nil } + +type nodeCommitterRecorder struct { + nodeRecorder + readyc chan raft.Ready +} + +func newNodeCommitterRecorder() *nodeCommitterRecorder { + readyc := make(chan raft.Ready, 1) + readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateLeader}} + return &nodeCommitterRecorder{readyc: readyc} +} +func (n *nodeCommitterRecorder) Propose(ctx context.Context, data []byte) error { + n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Data: data}}} + return n.nodeRecorder.Propose(ctx, data) +} +func (n *nodeCommitterRecorder) Configure(ctx context.Context, data []byte) error { + n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raft.EntryConfig, Data: data}}} + return n.nodeRecorder.Configure(ctx, data) +} +func (n *nodeCommitterRecorder) Ready() <-chan raft.Ready { + return n.readyc +}