raft: use pb.Config instead of []byte for Configure

This commit is contained in:
Yicheng Qin 2014-09-22 14:38:21 -07:00
parent 4203569da2
commit dc36ae7058
7 changed files with 251 additions and 244 deletions

View File

@ -10,7 +10,6 @@
It has these top-level messages:
Request
Config
*/
package etcdserverpb
@ -28,39 +27,6 @@ var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf
type ConfigType int32
const (
ConfigAddNode ConfigType = 0
ConfigRemoveNode ConfigType = 1
)
var ConfigType_name = map[int32]string{
0: "ConfigAddNode",
1: "ConfigRemoveNode",
}
var ConfigType_value = map[string]int32{
"ConfigAddNode": 0,
"ConfigRemoveNode": 1,
}
func (x ConfigType) Enum() *ConfigType {
p := new(ConfigType)
*p = x
return p
}
func (x ConfigType) String() string {
return proto.EnumName(ConfigType_name, int32(x))
}
func (x *ConfigType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(ConfigType_value, data, "ConfigType")
if err != nil {
return err
}
*x = ConfigType(value)
return nil
}
type Request struct {
Id int64 `protobuf:"varint,1,req,name=id" json:"id"`
Method string `protobuf:"bytes,2,req,name=method" json:"method"`
@ -84,20 +50,7 @@ func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {}
type Config struct {
ID int64 `protobuf:"varint,1,req" json:"ID"`
Type ConfigType `protobuf:"varint,2,req,enum=etcdserverpb.ConfigType" json:"Type"`
NodeID int64 `protobuf:"varint,3,req" json:"NodeID"`
Context []byte `protobuf:"bytes,4,opt" json:"Context"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Config) Reset() { *m = Config{} }
func (m *Config) String() string { return proto.CompactTextString(m) }
func (*Config) ProtoMessage() {}
func init() {
proto.RegisterEnum("etcdserverpb.ConfigType", ConfigType_name, ConfigType_value)
}
func (m *Request) Unmarshal(data []byte) error {
l := len(data)
@ -407,115 +360,6 @@ func (m *Request) Unmarshal(data []byte) error {
}
return nil
}
func (m *Config) Unmarshal(data []byte) error {
l := len(data)
index := 0
for index < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
switch fieldNum {
case 1:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.ID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.Type |= (ConfigType(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.NodeID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 2 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
postIndex := index + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Context = append(m.Context, data[index:postIndex]...)
index = postIndex
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
index -= sizeOfWire
skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
if err != nil {
return err
}
if (index + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
index += skippy
}
}
return nil
}
func (m *Request) Size() (n int) {
var l int
_ = l
@ -545,19 +389,6 @@ func (m *Request) Size() (n int) {
}
return n
}
func (m *Config) Size() (n int) {
var l int
_ = l
n += 1 + sovEtcdserver(uint64(m.ID))
n += 1 + sovEtcdserver(uint64(m.Type))
n += 1 + sovEtcdserver(uint64(m.NodeID))
l = len(m.Context)
n += 1 + l + sovEtcdserver(uint64(l))
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovEtcdserver(x uint64) (n int) {
for {
@ -673,39 +504,6 @@ func (m *Request) MarshalTo(data []byte) (n int, err error) {
}
return i, nil
}
func (m *Config) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *Config) MarshalTo(data []byte) (n int, err error) {
var i int
_ = i
var l int
_ = l
data[i] = 0x8
i++
i = encodeVarintEtcdserver(data, i, uint64(m.ID))
data[i] = 0x10
i++
i = encodeVarintEtcdserver(data, i, uint64(m.Type))
data[i] = 0x18
i++
i = encodeVarintEtcdserver(data, i, uint64(m.NodeID))
data[i] = 0x22
i++
i = encodeVarintEtcdserver(data, i, uint64(len(m.Context)))
i += copy(data[i:], m.Context)
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}
return i, nil
}
func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)

View File

@ -6,7 +6,6 @@ option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
option (gogoproto.goproto_enum_prefix_all) = false;
message Request {
required int64 id = 1 [(gogoproto.nullable) = false];
@ -25,15 +24,3 @@ message Request {
required bool quorum = 14 [(gogoproto.nullable) = false];
required int64 time = 15 [(gogoproto.nullable) = false];
}
enum ConfigType {
ConfigAddNode = 0;
ConfigRemoveNode = 1;
}
message Config {
required int64 ID = 1 [(gogoproto.nullable) = false];
required ConfigType Type = 2 [(gogoproto.nullable) = false];
required int64 NodeID = 3 [(gogoproto.nullable) = false];
optional bytes Context = 4 [(gogoproto.nullable) = false];
}

View File

@ -129,7 +129,7 @@ func (s *EtcdServer) run() {
}
s.w.Trigger(r.Id, s.applyRequest(r))
case raftpb.EntryConfig:
var c pb.Config
var c raftpb.Config
if err := c.Unmarshal(e.Data); err != nil {
panic("TODO: this is bad, what do we do about it?")
}
@ -231,9 +231,9 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
}
func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error {
req := pb.Config{
req := raftpb.Config{
ID: GenID(),
Type: pb.ConfigAddNode,
Type: raftpb.ConfigAddNode,
NodeID: id,
Context: context,
}
@ -241,9 +241,9 @@ func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) erro
}
func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error {
req := pb.Config{
req := raftpb.Config{
ID: GenID(),
Type: pb.ConfigRemoveNode,
Type: raftpb.ConfigRemoveNode,
NodeID: id,
}
return s.configure(ctx, req)
@ -251,14 +251,13 @@ func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error {
// configure sends configuration change through consensus then performs it.
// It will block until the change is performed or there is an error.
func (s *EtcdServer) configure(ctx context.Context, r pb.Config) error {
data, err := r.Marshal()
if err != nil {
log.Printf("marshal request %#v error: %v", r, err)
func (s *EtcdServer) configure(ctx context.Context, r raftpb.Config) error {
ch := s.w.Register(r.ID)
if err := s.Node.Configure(ctx, r); err != nil {
log.Printf("configure error: %v", err)
s.w.Trigger(r.ID, nil)
return err
}
ch := s.w.Register(r.ID)
s.Node.Configure(ctx, data)
select {
case <-ch:
return nil
@ -342,11 +341,11 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
}
}
func (s *EtcdServer) applyConfig(r pb.Config) {
func (s *EtcdServer) applyConfig(r raftpb.Config) {
switch r.Type {
case pb.ConfigAddNode:
case raftpb.ConfigAddNode:
s.Node.AddNode(r.NodeID)
case pb.ConfigRemoveNode:
case raftpb.ConfigRemoveNode:
s.Node.RemoveNode(r.NodeID)
default:
// This should never be reached

View File

@ -802,16 +802,16 @@ func newReadyNode() *readyNode {
readyc := make(chan raft.Ready, 1)
return &readyNode{readyc: readyc}
}
func (n *readyNode) Tick() {}
func (n *readyNode) Campaign(ctx context.Context) error { return nil }
func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil }
func (n *readyNode) Configure(ctx context.Context, data []byte) error { return nil }
func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil }
func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
func (n *readyNode) Stop() {}
func (n *readyNode) Compact(d []byte) {}
func (n *readyNode) AddNode(id int64) {}
func (n *readyNode) RemoveNode(id int64) {}
func (n *readyNode) Tick() {}
func (n *readyNode) Campaign(ctx context.Context) error { return nil }
func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil }
func (n *readyNode) Configure(ctx context.Context, conf raftpb.Config) error { return nil }
func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil }
func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
func (n *readyNode) Stop() {}
func (n *readyNode) Compact(d []byte) {}
func (n *readyNode) AddNode(id int64) {}
func (n *readyNode) RemoveNode(id int64) {}
type nodeRecorder struct {
recorder
@ -828,7 +828,7 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
n.record("Propose")
return nil
}
func (n *nodeRecorder) Configure(ctx context.Context, data []byte) error {
func (n *nodeRecorder) Configure(ctx context.Context, conf raftpb.Config) error {
n.record("Configure")
return nil
}
@ -894,9 +894,13 @@ func (n *nodeCommitterRecorder) Propose(ctx context.Context, data []byte) error
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Data: data}}}
return n.nodeRecorder.Propose(ctx, data)
}
func (n *nodeCommitterRecorder) Configure(ctx context.Context, data []byte) error {
func (n *nodeCommitterRecorder) Configure(ctx context.Context, conf raftpb.Config) error {
data, err := conf.Marshal()
if err != nil {
return err
}
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfig, Data: data}}}
return n.nodeRecorder.Configure(ctx, data)
return n.nodeRecorder.Configure(ctx, conf)
}
func (n *nodeCommitterRecorder) Ready() <-chan raft.Ready {
return n.readyc

View File

@ -81,7 +81,8 @@ type Node interface {
// Propose proposes that data be appended to the log.
Propose(ctx context.Context, data []byte) error
// Configure proposes config change. Only one config can be in the process of going through consensus at a time.
Configure(ctx context.Context, data []byte) error
// Configure doesn't perform config change.
Configure(ctx context.Context, conf pb.Config) error
// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
Step(ctx context.Context, msg pb.Message) error
// Ready returns a channel that returns the current point-in-time state
@ -246,7 +247,11 @@ func (n *node) Propose(ctx context.Context, data []byte) error {
return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}})
}
func (n *node) Configure(ctx context.Context, data []byte) error {
func (n *node) Configure(ctx context.Context, conf pb.Config) error {
data, err := conf.Marshal()
if err != nil {
return err
}
return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig, Data: data}}})
}

View File

@ -14,6 +14,7 @@
Snapshot
Message
HardState
Config
*/
package raftpb
@ -64,6 +65,39 @@ func (x *EntryType) UnmarshalJSON(data []byte) error {
return nil
}
type ConfigType int32
const (
ConfigAddNode ConfigType = 0
ConfigRemoveNode ConfigType = 1
)
var ConfigType_name = map[int32]string{
0: "ConfigAddNode",
1: "ConfigRemoveNode",
}
var ConfigType_value = map[string]int32{
"ConfigAddNode": 0,
"ConfigRemoveNode": 1,
}
func (x ConfigType) Enum() *ConfigType {
p := new(ConfigType)
*p = x
return p
}
func (x ConfigType) String() string {
return proto.EnumName(ConfigType_name, int32(x))
}
func (x *ConfigType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(ConfigType_value, data, "ConfigType")
if err != nil {
return err
}
*x = ConfigType(value)
return nil
}
type Info struct {
Id int64 `protobuf:"varint,1,req,name=id" json:"id"`
XXX_unrecognized []byte `json:"-"`
@ -125,8 +159,21 @@ func (m *HardState) Reset() { *m = HardState{} }
func (m *HardState) String() string { return proto.CompactTextString(m) }
func (*HardState) ProtoMessage() {}
type Config struct {
ID int64 `protobuf:"varint,1,req" json:"ID"`
Type ConfigType `protobuf:"varint,2,req,enum=raftpb.ConfigType" json:"Type"`
NodeID int64 `protobuf:"varint,3,req" json:"NodeID"`
Context []byte `protobuf:"bytes,4,opt" json:"Context"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Config) Reset() { *m = Config{} }
func (m *Config) String() string { return proto.CompactTextString(m) }
func (*Config) ProtoMessage() {}
func init() {
proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value)
proto.RegisterEnum("raftpb.ConfigType", ConfigType_name, ConfigType_value)
}
func (m *Info) Unmarshal(data []byte) error {
l := len(data)
@ -686,6 +733,115 @@ func (m *HardState) Unmarshal(data []byte) error {
}
return nil
}
func (m *Config) Unmarshal(data []byte) error {
l := len(data)
index := 0
for index < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
switch fieldNum {
case 1:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.ID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.Type |= (ConfigType(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.NodeID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 2 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
postIndex := index + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Context = append(m.Context, data[index:postIndex]...)
index = postIndex
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
index -= sizeOfWire
skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
if err != nil {
return err
}
if (index + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
index += skippy
}
}
return nil
}
func (m *Info) Size() (n int) {
var l int
_ = l
@ -759,6 +915,19 @@ func (m *HardState) Size() (n int) {
}
return n
}
func (m *Config) Size() (n int) {
var l int
_ = l
n += 1 + sovRaft(uint64(m.ID))
n += 1 + sovRaft(uint64(m.Type))
n += 1 + sovRaft(uint64(m.NodeID))
l = len(m.Context)
n += 1 + l + sovRaft(uint64(l))
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovRaft(x uint64) (n int) {
for {
@ -962,6 +1131,39 @@ func (m *HardState) MarshalTo(data []byte) (n int, err error) {
}
return i, nil
}
func (m *Config) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *Config) MarshalTo(data []byte) (n int, err error) {
var i int
_ = i
var l int
_ = l
data[i] = 0x8
i++
i = encodeVarintRaft(data, i, uint64(m.ID))
data[i] = 0x10
i++
i = encodeVarintRaft(data, i, uint64(m.Type))
data[i] = 0x18
i++
i = encodeVarintRaft(data, i, uint64(m.NodeID))
data[i] = 0x22
i++
i = encodeVarintRaft(data, i, uint64(len(m.Context)))
i += copy(data[i:], m.Context)
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}
return i, nil
}
func encodeFixed64Raft(data []byte, offset int, v uint64) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)

View File

@ -48,3 +48,15 @@ message HardState {
required int64 vote = 2 [(gogoproto.nullable) = false];
required int64 commit = 3 [(gogoproto.nullable) = false];
}
enum ConfigType {
ConfigAddNode = 0;
ConfigRemoveNode = 1;
}
message Config {
required int64 ID = 1 [(gogoproto.nullable) = false];
required ConfigType Type = 2 [(gogoproto.nullable) = false];
required int64 NodeID = 3 [(gogoproto.nullable) = false];
optional bytes Context = 4 [(gogoproto.nullable) = false];
}