From 58503817ecd08c23e93ca2c538120bd481cd6754 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 15 Apr 2015 10:58:56 -0700 Subject: [PATCH] etcdserver: internal request union --- etcdserver/etcdserverpb/etcdserver.pb.go | 142 +++++++++++++++++++++++ etcdserver/etcdserverpb/etcdserver.proto | 9 ++ etcdserver/server.go | 20 +++- etcdserver/server_test.go | 10 +- pkg/pbutil/pbutil.go | 7 ++ 5 files changed, 180 insertions(+), 8 deletions(-) diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index 518c31b62..3fc62b854 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -12,6 +12,7 @@ It has these top-level messages: Request Metadata + InternalRaftRequest */ package etcdserverpb @@ -61,6 +62,17 @@ func (m *Metadata) Reset() { *m = Metadata{} } func (m *Metadata) String() string { return proto.CompactTextString(m) } func (*Metadata) ProtoMessage() {} +// An InternalRaftRequest is the union of all requests which can be +// sent via raft. +type InternalRaftRequest struct { + V2 *Request `protobuf:"bytes,1,opt,name=v2" json:"v2,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *InternalRaftRequest) Reset() { *m = InternalRaftRequest{} } +func (m *InternalRaftRequest) String() string { return proto.CompactTextString(m) } +func (*InternalRaftRequest) ProtoMessage() {} + func init() { } func (m *Request) Unmarshal(data []byte) error { @@ -462,6 +474,76 @@ func (m *Metadata) Unmarshal(data []byte) error { return nil } +func (m *InternalRaftRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field V2", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.V2 == nil { + m.V2 = &Request{} + } + if err := m.V2.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipEtcdserver(data[iNdEx:]) + if err != nil { + return err + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + return nil +} func skipEtcdserver(data []byte) (n int, err error) { l := len(data) iNdEx := 0 @@ -546,6 +628,22 @@ func skipEtcdserver(data []byte) (n int, err error) { } panic("unreachable") } +func (this *InternalRaftRequest) GetValue() interface{} { + if this.V2 != nil { + return this.V2 + } + return nil +} + +func (this *InternalRaftRequest) SetValue(value interface{}) bool { + switch vt := value.(type) { + case *Request: + this.V2 = vt + default: + return false + } + return true +} func (m *Request) Size() (n int) { var l int _ = l @@ -588,6 +686,19 @@ func (m *Metadata) Size() (n int) { return n } +func (m *InternalRaftRequest) Size() (n int) { + var l int + _ = l + if m.V2 != nil { + l = m.V2.Size() + n += 1 + l + sovEtcdserver(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + func sovEtcdserver(x uint64) (n int) { for { n++ @@ -740,6 +851,37 @@ func (m *Metadata) MarshalTo(data []byte) (n int, err error) { return i, nil } +func (m *InternalRaftRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *InternalRaftRequest) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if m.V2 != nil { + data[i] = 0xa + i++ + i = encodeVarintEtcdserver(data, i, uint64(m.V2.Size())) + n1, err := m.V2.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n1 + } + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} + func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index bfc29625c..ee989d9c3 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -31,3 +31,12 @@ message Metadata { optional uint64 NodeID = 1 [(gogoproto.nullable) = false]; optional uint64 ClusterID = 2 [(gogoproto.nullable) = false]; } + +// An InternalRaftRequest is the union of all requests which can be +// sent via raft. +message InternalRaftRequest { + option (gogoproto.onlyone) = true; + oneof value { + Request v2 = 1; + } +} diff --git a/etcdserver/server.go b/etcdserver/server.go index 576abf227..330029bda 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -521,7 +521,9 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { } switch r.Method { case "POST", "PUT", "DELETE", "QGET": - data, err := r.Marshal() + var raftReq pb.InternalRaftRequest + raftReq.V2 = &r + data, err := raftReq.Marshal() if err != nil { return Response{}, err } @@ -741,9 +743,19 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint } break } - var r pb.Request - pbutil.MustUnmarshal(&r, e.Data) - s.w.Trigger(r.ID, s.applyRequest(r)) + + var raftReq pb.InternalRaftRequest + if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible + var r pb.Request + pbutil.MustUnmarshal(&r, e.Data) + s.w.Trigger(r.ID, s.applyRequest(r)) + } else { + switch { + case raftReq.V2 != nil: + req := raftReq.V2 + s.w.Trigger(req.ID, s.applyRequest(*req)) + } + } case raftpb.EntryConfChange: var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 71c4b4ea8..a19d8730f 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -982,10 +982,11 @@ func TestPublish(t *testing.T) { t.Fatalf("action = %s, want Propose", action[0].Name) } data := action[0].Params[0].([]byte) - var r pb.Request - if err := r.Unmarshal(data); err != nil { + var rr pb.InternalRaftRequest + if err := rr.Unmarshal(data); err != nil { t.Fatalf("unmarshal request error: %v", err) } + r := rr.V2 if r.Method != "PUT" { t.Errorf("method = %s, want PUT", r.Method) } @@ -1062,10 +1063,11 @@ func TestUpdateVersion(t *testing.T) { t.Fatalf("action = %s, want Propose", action[0].Name) } data := action[0].Params[0].([]byte) - var r pb.Request - if err := r.Unmarshal(data); err != nil { + var rr pb.InternalRaftRequest + if err := rr.Unmarshal(data); err != nil { t.Fatalf("unmarshal request error: %v", err) } + r := rr.V2 if r.Method != "PUT" { t.Errorf("method = %s, want PUT", r.Method) } diff --git a/pkg/pbutil/pbutil.go b/pkg/pbutil/pbutil.go index 5e154cace..66693a2f2 100644 --- a/pkg/pbutil/pbutil.go +++ b/pkg/pbutil/pbutil.go @@ -42,6 +42,13 @@ func MustUnmarshal(um Unmarshaler, data []byte) { } } +func MaybeUnmarshal(um Unmarshaler, data []byte) bool { + if err := um.Unmarshal(data); err != nil { + return false + } + return true +} + func GetBool(v *bool) (vv bool, set bool) { if v == nil { return false, false