From dc36ae70585f3fa5021f2607ba15d262be81d9ed Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 22 Sep 2014 14:38:21 -0700 Subject: [PATCH] raft: use pb.Config instead of []byte for Configure --- etcdserver/etcdserverpb/etcdserver.pb.go | 202 ----------------------- etcdserver/etcdserverpb/etcdserver.proto | 13 -- etcdserver/server.go | 27 ++- etcdserver/server_test.go | 30 ++-- raft/node.go | 9 +- raft/raftpb/raft.pb.go | 202 +++++++++++++++++++++++ raft/raftpb/raft.proto | 12 ++ 7 files changed, 251 insertions(+), 244 deletions(-) diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index f0157bdce..a4176a31f 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -10,7 +10,6 @@ It has these top-level messages: Request - Config */ package etcdserverpb @@ -28,39 +27,6 @@ var _ = proto.Marshal var _ = &json.SyntaxError{} var _ = math.Inf -type ConfigType int32 - -const ( - ConfigAddNode ConfigType = 0 - ConfigRemoveNode ConfigType = 1 -) - -var ConfigType_name = map[int32]string{ - 0: "ConfigAddNode", - 1: "ConfigRemoveNode", -} -var ConfigType_value = map[string]int32{ - "ConfigAddNode": 0, - "ConfigRemoveNode": 1, -} - -func (x ConfigType) Enum() *ConfigType { - p := new(ConfigType) - *p = x - return p -} -func (x ConfigType) String() string { - return proto.EnumName(ConfigType_name, int32(x)) -} -func (x *ConfigType) UnmarshalJSON(data []byte) error { - value, err := proto.UnmarshalJSONEnum(ConfigType_value, data, "ConfigType") - if err != nil { - return err - } - *x = ConfigType(value) - return nil -} - type Request struct { Id int64 `protobuf:"varint,1,req,name=id" json:"id"` Method string `protobuf:"bytes,2,req,name=method" json:"method"` @@ -84,20 +50,7 @@ func (m *Request) Reset() { *m = Request{} } func (m *Request) String() string { return proto.CompactTextString(m) } func (*Request) ProtoMessage() {} -type Config struct { - ID int64 `protobuf:"varint,1,req" json:"ID"` - Type ConfigType `protobuf:"varint,2,req,enum=etcdserverpb.ConfigType" json:"Type"` - NodeID int64 `protobuf:"varint,3,req" json:"NodeID"` - Context []byte `protobuf:"bytes,4,opt" json:"Context"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *Config) Reset() { *m = Config{} } -func (m *Config) String() string { return proto.CompactTextString(m) } -func (*Config) ProtoMessage() {} - func init() { - proto.RegisterEnum("etcdserverpb.ConfigType", ConfigType_name, ConfigType_value) } func (m *Request) Unmarshal(data []byte) error { l := len(data) @@ -407,115 +360,6 @@ func (m *Request) Unmarshal(data []byte) error { } return nil } -func (m *Config) Unmarshal(data []byte) error { - l := len(data) - index := 0 - for index < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if index >= l { - return io.ErrUnexpectedEOF - } - b := data[index] - index++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - switch fieldNum { - case 1: - if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto.ErrWrongType - } - for shift := uint(0); ; shift += 7 { - if index >= l { - return io.ErrUnexpectedEOF - } - b := data[index] - index++ - m.ID |= (int64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - case 2: - if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto.ErrWrongType - } - for shift := uint(0); ; shift += 7 { - if index >= l { - return io.ErrUnexpectedEOF - } - b := data[index] - index++ - m.Type |= (ConfigType(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - case 3: - if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto.ErrWrongType - } - for shift := uint(0); ; shift += 7 { - if index >= l { - return io.ErrUnexpectedEOF - } - b := data[index] - index++ - m.NodeID |= (int64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - case 4: - if wireType != 2 { - return code_google_com_p_gogoprotobuf_proto.ErrWrongType - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if index >= l { - return io.ErrUnexpectedEOF - } - b := data[index] - index++ - byteLen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - postIndex := index + byteLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Context = append(m.Context, data[index:postIndex]...) - index = postIndex - default: - var sizeOfWire int - for { - sizeOfWire++ - wire >>= 7 - if wire == 0 { - break - } - } - index -= sizeOfWire - skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:]) - if err != nil { - return err - } - if (index + skippy) > l { - return io.ErrUnexpectedEOF - } - m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) - index += skippy - } - } - return nil -} func (m *Request) Size() (n int) { var l int _ = l @@ -545,19 +389,6 @@ func (m *Request) Size() (n int) { } return n } -func (m *Config) Size() (n int) { - var l int - _ = l - n += 1 + sovEtcdserver(uint64(m.ID)) - n += 1 + sovEtcdserver(uint64(m.Type)) - n += 1 + sovEtcdserver(uint64(m.NodeID)) - l = len(m.Context) - n += 1 + l + sovEtcdserver(uint64(l)) - if m.XXX_unrecognized != nil { - n += len(m.XXX_unrecognized) - } - return n -} func sovEtcdserver(x uint64) (n int) { for { @@ -673,39 +504,6 @@ func (m *Request) MarshalTo(data []byte) (n int, err error) { } return i, nil } -func (m *Config) Marshal() (data []byte, err error) { - size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) - if err != nil { - return nil, err - } - return data[:n], nil -} - -func (m *Config) MarshalTo(data []byte) (n int, err error) { - var i int - _ = i - var l int - _ = l - data[i] = 0x8 - i++ - i = encodeVarintEtcdserver(data, i, uint64(m.ID)) - data[i] = 0x10 - i++ - i = encodeVarintEtcdserver(data, i, uint64(m.Type)) - data[i] = 0x18 - i++ - i = encodeVarintEtcdserver(data, i, uint64(m.NodeID)) - data[i] = 0x22 - i++ - i = encodeVarintEtcdserver(data, i, uint64(len(m.Context))) - i += copy(data[i:], m.Context) - if m.XXX_unrecognized != nil { - i += copy(data[i:], m.XXX_unrecognized) - } - return i, nil -} func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index 243dc6fe4..bb34ac250 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -6,7 +6,6 @@ option (gogoproto.marshaler_all) = true; option (gogoproto.sizer_all) = true; option (gogoproto.unmarshaler_all) = true; option (gogoproto.goproto_getters_all) = false; -option (gogoproto.goproto_enum_prefix_all) = false; message Request { required int64 id = 1 [(gogoproto.nullable) = false]; @@ -25,15 +24,3 @@ message Request { required bool quorum = 14 [(gogoproto.nullable) = false]; required int64 time = 15 [(gogoproto.nullable) = false]; } - -enum ConfigType { - ConfigAddNode = 0; - ConfigRemoveNode = 1; -} - -message Config { - required int64 ID = 1 [(gogoproto.nullable) = false]; - required ConfigType Type = 2 [(gogoproto.nullable) = false]; - required int64 NodeID = 3 [(gogoproto.nullable) = false]; - optional bytes Context = 4 [(gogoproto.nullable) = false]; -} diff --git a/etcdserver/server.go b/etcdserver/server.go index 4eac5a5b9..c19d62356 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -129,7 +129,7 @@ func (s *EtcdServer) run() { } s.w.Trigger(r.Id, s.applyRequest(r)) case raftpb.EntryConfig: - var c pb.Config + var c raftpb.Config if err := c.Unmarshal(e.Data); err != nil { panic("TODO: this is bad, what do we do about it?") } @@ -231,9 +231,9 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { } func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error { - req := pb.Config{ + req := raftpb.Config{ ID: GenID(), - Type: pb.ConfigAddNode, + Type: raftpb.ConfigAddNode, NodeID: id, Context: context, } @@ -241,9 +241,9 @@ func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) erro } func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error { - req := pb.Config{ + req := raftpb.Config{ ID: GenID(), - Type: pb.ConfigRemoveNode, + Type: raftpb.ConfigRemoveNode, NodeID: id, } return s.configure(ctx, req) @@ -251,14 +251,13 @@ func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error { // configure sends configuration change through consensus then performs it. // It will block until the change is performed or there is an error. -func (s *EtcdServer) configure(ctx context.Context, r pb.Config) error { - data, err := r.Marshal() - if err != nil { - log.Printf("marshal request %#v error: %v", r, err) +func (s *EtcdServer) configure(ctx context.Context, r raftpb.Config) error { + ch := s.w.Register(r.ID) + if err := s.Node.Configure(ctx, r); err != nil { + log.Printf("configure error: %v", err) + s.w.Trigger(r.ID, nil) return err } - ch := s.w.Register(r.ID) - s.Node.Configure(ctx, data) select { case <-ch: return nil @@ -342,11 +341,11 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { } } -func (s *EtcdServer) applyConfig(r pb.Config) { +func (s *EtcdServer) applyConfig(r raftpb.Config) { switch r.Type { - case pb.ConfigAddNode: + case raftpb.ConfigAddNode: s.Node.AddNode(r.NodeID) - case pb.ConfigRemoveNode: + case raftpb.ConfigRemoveNode: s.Node.RemoveNode(r.NodeID) default: // This should never be reached diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 87845559c..34f88c81b 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -802,16 +802,16 @@ func newReadyNode() *readyNode { readyc := make(chan raft.Ready, 1) return &readyNode{readyc: readyc} } -func (n *readyNode) Tick() {} -func (n *readyNode) Campaign(ctx context.Context) error { return nil } -func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil } -func (n *readyNode) Configure(ctx context.Context, data []byte) error { return nil } -func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil } -func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } -func (n *readyNode) Stop() {} -func (n *readyNode) Compact(d []byte) {} -func (n *readyNode) AddNode(id int64) {} -func (n *readyNode) RemoveNode(id int64) {} +func (n *readyNode) Tick() {} +func (n *readyNode) Campaign(ctx context.Context) error { return nil } +func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil } +func (n *readyNode) Configure(ctx context.Context, conf raftpb.Config) error { return nil } +func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil } +func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } +func (n *readyNode) Stop() {} +func (n *readyNode) Compact(d []byte) {} +func (n *readyNode) AddNode(id int64) {} +func (n *readyNode) RemoveNode(id int64) {} type nodeRecorder struct { recorder @@ -828,7 +828,7 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error { n.record("Propose") return nil } -func (n *nodeRecorder) Configure(ctx context.Context, data []byte) error { +func (n *nodeRecorder) Configure(ctx context.Context, conf raftpb.Config) error { n.record("Configure") return nil } @@ -894,9 +894,13 @@ func (n *nodeCommitterRecorder) Propose(ctx context.Context, data []byte) error n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Data: data}}} return n.nodeRecorder.Propose(ctx, data) } -func (n *nodeCommitterRecorder) Configure(ctx context.Context, data []byte) error { +func (n *nodeCommitterRecorder) Configure(ctx context.Context, conf raftpb.Config) error { + data, err := conf.Marshal() + if err != nil { + return err + } n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfig, Data: data}}} - return n.nodeRecorder.Configure(ctx, data) + return n.nodeRecorder.Configure(ctx, conf) } func (n *nodeCommitterRecorder) Ready() <-chan raft.Ready { return n.readyc diff --git a/raft/node.go b/raft/node.go index 43c55b8f1..46001b1a3 100644 --- a/raft/node.go +++ b/raft/node.go @@ -81,7 +81,8 @@ type Node interface { // Propose proposes that data be appended to the log. Propose(ctx context.Context, data []byte) error // Configure proposes config change. Only one config can be in the process of going through consensus at a time. - Configure(ctx context.Context, data []byte) error + // Configure doesn't perform config change. + Configure(ctx context.Context, conf pb.Config) error // Step advances the state machine using the given message. ctx.Err() will be returned, if any. Step(ctx context.Context, msg pb.Message) error // Ready returns a channel that returns the current point-in-time state @@ -246,7 +247,11 @@ func (n *node) Propose(ctx context.Context, data []byte) error { return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}}) } -func (n *node) Configure(ctx context.Context, data []byte) error { +func (n *node) Configure(ctx context.Context, conf pb.Config) error { + data, err := conf.Marshal() + if err != nil { + return err + } return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig, Data: data}}}) } diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index c12a97417..1ba9a88d6 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -14,6 +14,7 @@ Snapshot Message HardState + Config */ package raftpb @@ -64,6 +65,39 @@ func (x *EntryType) UnmarshalJSON(data []byte) error { return nil } +type ConfigType int32 + +const ( + ConfigAddNode ConfigType = 0 + ConfigRemoveNode ConfigType = 1 +) + +var ConfigType_name = map[int32]string{ + 0: "ConfigAddNode", + 1: "ConfigRemoveNode", +} +var ConfigType_value = map[string]int32{ + "ConfigAddNode": 0, + "ConfigRemoveNode": 1, +} + +func (x ConfigType) Enum() *ConfigType { + p := new(ConfigType) + *p = x + return p +} +func (x ConfigType) String() string { + return proto.EnumName(ConfigType_name, int32(x)) +} +func (x *ConfigType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(ConfigType_value, data, "ConfigType") + if err != nil { + return err + } + *x = ConfigType(value) + return nil +} + type Info struct { Id int64 `protobuf:"varint,1,req,name=id" json:"id"` XXX_unrecognized []byte `json:"-"` @@ -125,8 +159,21 @@ func (m *HardState) Reset() { *m = HardState{} } func (m *HardState) String() string { return proto.CompactTextString(m) } func (*HardState) ProtoMessage() {} +type Config struct { + ID int64 `protobuf:"varint,1,req" json:"ID"` + Type ConfigType `protobuf:"varint,2,req,enum=raftpb.ConfigType" json:"Type"` + NodeID int64 `protobuf:"varint,3,req" json:"NodeID"` + Context []byte `protobuf:"bytes,4,opt" json:"Context"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Config) Reset() { *m = Config{} } +func (m *Config) String() string { return proto.CompactTextString(m) } +func (*Config) ProtoMessage() {} + func init() { proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value) + proto.RegisterEnum("raftpb.ConfigType", ConfigType_name, ConfigType_value) } func (m *Info) Unmarshal(data []byte) error { l := len(data) @@ -686,6 +733,115 @@ func (m *HardState) Unmarshal(data []byte) error { } return nil } +func (m *Config) Unmarshal(data []byte) error { + l := len(data) + index := 0 + for index < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.ID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Type |= (ConfigType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.NodeID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := index + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Context = append(m.Context, data[index:postIndex]...) + index = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + index -= sizeOfWire + skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:]) + if err != nil { + return err + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) + index += skippy + } + } + return nil +} func (m *Info) Size() (n int) { var l int _ = l @@ -759,6 +915,19 @@ func (m *HardState) Size() (n int) { } return n } +func (m *Config) Size() (n int) { + var l int + _ = l + n += 1 + sovRaft(uint64(m.ID)) + n += 1 + sovRaft(uint64(m.Type)) + n += 1 + sovRaft(uint64(m.NodeID)) + l = len(m.Context) + n += 1 + l + sovRaft(uint64(l)) + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} func sovRaft(x uint64) (n int) { for { @@ -962,6 +1131,39 @@ func (m *HardState) MarshalTo(data []byte) (n int, err error) { } return i, nil } +func (m *Config) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Config) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintRaft(data, i, uint64(m.ID)) + data[i] = 0x10 + i++ + i = encodeVarintRaft(data, i, uint64(m.Type)) + data[i] = 0x18 + i++ + i = encodeVarintRaft(data, i, uint64(m.NodeID)) + data[i] = 0x22 + i++ + i = encodeVarintRaft(data, i, uint64(len(m.Context))) + i += copy(data[i:], m.Context) + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} func encodeFixed64Raft(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 6eff384ca..38f0db45f 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -48,3 +48,15 @@ message HardState { required int64 vote = 2 [(gogoproto.nullable) = false]; required int64 commit = 3 [(gogoproto.nullable) = false]; } + +enum ConfigType { + ConfigAddNode = 0; + ConfigRemoveNode = 1; +} + +message Config { + required int64 ID = 1 [(gogoproto.nullable) = false]; + required ConfigType Type = 2 [(gogoproto.nullable) = false]; + required int64 NodeID = 3 [(gogoproto.nullable) = false]; + optional bytes Context = 4 [(gogoproto.nullable) = false]; +}