diff --git a/etcdctlv3/command/txn_command.go b/etcdctlv3/command/txn_command.go new file mode 100644 index 000000000..14950724f --- /dev/null +++ b/etcdctlv3/command/txn_command.go @@ -0,0 +1,221 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "bufio" + "fmt" + "os" + "strconv" + "strings" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +// NewTxnCommand returns the CLI command for "txn". +func NewTxnCommand() cli.Command { + return cli.Command{ + Name: "txn", + Action: func(c *cli.Context) { + txnCommandFunc(c) + }, + } +} + +// txnCommandFunc executes the "txn" command. +func txnCommandFunc(c *cli.Context) { + if len(c.Args()) != 0 { + panic("unexpected args") + } + + reader := bufio.NewReader(os.Stdin) + + next := compareState + txn := &pb.TxnRequest{} + for next != nil { + next = next(txn, reader) + } + + conn, err := grpc.Dial("127.0.0.1:12379") + if err != nil { + panic(err) + } + etcd := pb.NewEtcdClient(conn) + + resp, err := etcd.Txn(context.Background(), txn) + if err != nil { + fmt.Println(err) + } + if resp.Succeeded { + fmt.Println("executed success request list") + } else { + fmt.Println("executed failure request list") + } +} + +type stateFunc func(txn *pb.TxnRequest, r *bufio.Reader) stateFunc + +func compareState(txn *pb.TxnRequest, r *bufio.Reader) stateFunc { + fmt.Println("entry comparison[key target expected_result compare_value] (end with empty line):") + + line, err := r.ReadString('\n') + if err != nil { + os.Exit(1) + } + + if len(line) == 1 { + return successState + } + + // remove trialling \n + line = line[:len(line)-1] + c, err := parseCompare(line) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + txn.Compare = append(txn.Compare, c) + + return compareState +} + +func successState(txn *pb.TxnRequest, r *bufio.Reader) stateFunc { + fmt.Println("entry success request[method key value(end_range)] (end with empty line):") + + line, err := r.ReadString('\n') + if err != nil { + os.Exit(1) + } + + if len(line) == 1 { + return failureState + } + + // remove trialling \n + line = line[:len(line)-1] + ru, err := parseRequestUnion(line) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + txn.Success = append(txn.Success, ru) + + return successState +} + +func failureState(txn *pb.TxnRequest, r *bufio.Reader) stateFunc { + fmt.Println("entry failure request[method key value(end_range)] (end with empty line):") + + line, err := r.ReadString('\n') + if err != nil { + os.Exit(1) + } + + if len(line) == 1 { + return nil + } + + // remove trialling \n + line = line[:len(line)-1] + ru, err := parseRequestUnion(line) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + txn.Failure = append(txn.Failure, ru) + + return failureState +} + +func parseRequestUnion(line string) (*pb.RequestUnion, error) { + parts := strings.Split(line, " ") + if len(parts) < 2 { + return nil, fmt.Errorf("invalid txn compare request: %s", line) + } + + ru := &pb.RequestUnion{} + key := []byte(parts[1]) + switch parts[0] { + case "r", "range": + ru.RequestRange = &pb.RangeRequest{Key: key} + if len(parts) == 3 { + ru.RequestRange.RangeEnd = []byte(parts[2]) + } + case "p", "put": + ru.RequestPut = &pb.PutRequest{Key: key, Value: []byte(parts[2])} + case "d", "deleteRange": + ru.RequestDeleteRange = &pb.DeleteRangeRequest{Key: key} + if len(parts) == 3 { + ru.RequestRange.RangeEnd = []byte(parts[2]) + } + default: + return nil, fmt.Errorf("invalid txn request: %s", line) + } + return ru, nil +} + +func parseCompare(line string) (*pb.Compare, error) { + parts := strings.Split(line, " ") + if len(parts) != 4 { + return nil, fmt.Errorf("invalid txn compare request: %s", line) + } + + var err error + c := &pb.Compare{} + c.Key = []byte(parts[0]) + switch parts[1] { + case "ver", "version": + c.Target = pb.Compare_VERSION + c.Version, err = strconv.ParseInt(parts[3], 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid txn compare request: %s", line) + } + case "c", "create": + c.Target = pb.Compare_CREATE + c.CreateIndex, err = strconv.ParseInt(parts[3], 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid txn compare request: %s", line) + } + case "m", "mod": + c.Target = pb.Compare_MOD + c.ModIndex, err = strconv.ParseInt(parts[3], 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid txn compare request: %s", line) + } + case "val", "value": + c.Target = pb.Compare_VALUE + c.Value = []byte(parts[3]) + default: + return nil, fmt.Errorf("invalid txn compare request: %s", line) + } + + switch parts[2] { + case "g", "greater": + c.Result = pb.Compare_GREATER + case "e", "equal": + c.Result = pb.Compare_EQUAL + case "l", "less": + c.Result = pb.Compare_LESS + default: + return nil, fmt.Errorf("invalid txn compare request: %s", line) + } + return c, nil +} diff --git a/etcdctlv3/main.go b/etcdctlv3/main.go index bf7a5d07b..0b09152a8 100644 --- a/etcdctlv3/main.go +++ b/etcdctlv3/main.go @@ -31,6 +31,7 @@ func main() { command.NewRangeCommand(), command.NewPutCommand(), command.NewDeleteRangeCommand(), + command.NewTxnCommand(), } app.Run(os.Args) diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index b2af39908..5376eb1ef 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -44,8 +44,8 @@ func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*p } func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { - panic("not implemented") - return nil, nil + resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Txn: r}) + return resp.(*pb.TxnResponse), nil } func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index b55603716..fa208a30c 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -24,27 +24,53 @@ var _ grpc.ClientConn // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -type Compare_CompareType int32 +type Compare_CompareResult int32 const ( - Compare_EQUAL Compare_CompareType = 0 - Compare_GREATER Compare_CompareType = 1 - Compare_LESS Compare_CompareType = 2 + Compare_EQUAL Compare_CompareResult = 0 + Compare_GREATER Compare_CompareResult = 1 + Compare_LESS Compare_CompareResult = 2 ) -var Compare_CompareType_name = map[int32]string{ +var Compare_CompareResult_name = map[int32]string{ 0: "EQUAL", 1: "GREATER", 2: "LESS", } -var Compare_CompareType_value = map[string]int32{ +var Compare_CompareResult_value = map[string]int32{ "EQUAL": 0, "GREATER": 1, "LESS": 2, } -func (x Compare_CompareType) String() string { - return proto.EnumName(Compare_CompareType_name, int32(x)) +func (x Compare_CompareResult) String() string { + return proto.EnumName(Compare_CompareResult_name, int32(x)) +} + +type Compare_CompareTarget int32 + +const ( + Compare_VERSION Compare_CompareTarget = 0 + Compare_CREATE Compare_CompareTarget = 1 + Compare_MOD Compare_CompareTarget = 2 + Compare_VALUE Compare_CompareTarget = 3 +) + +var Compare_CompareTarget_name = map[int32]string{ + 0: "VERSION", + 1: "CREATE", + 2: "MOD", + 3: "VALUE", +} +var Compare_CompareTarget_value = map[string]int32{ + "VERSION": 0, + "CREATE": 1, + "MOD": 2, + "VALUE": 3, +} + +func (x Compare_CompareTarget) String() string { + return proto.EnumName(Compare_CompareTarget_name, int32(x)) } type ResponseHeader struct { @@ -184,7 +210,7 @@ func (m *RequestUnion) GetRequestDeleteRange() *DeleteRangeRequest { } type ResponseUnion struct { - ReponseRange *RangeResponse `protobuf:"bytes,1,opt,name=reponse_range" json:"reponse_range,omitempty"` + ResponseRange *RangeResponse `protobuf:"bytes,1,opt,name=response_range" json:"response_range,omitempty"` ResponsePut *PutResponse `protobuf:"bytes,2,opt,name=response_put" json:"response_put,omitempty"` ResponseDeleteRange *DeleteRangeResponse `protobuf:"bytes,3,opt,name=response_delete_range" json:"response_delete_range,omitempty"` } @@ -193,9 +219,9 @@ func (m *ResponseUnion) Reset() { *m = ResponseUnion{} } func (m *ResponseUnion) String() string { return proto.CompactTextString(m) } func (*ResponseUnion) ProtoMessage() {} -func (m *ResponseUnion) GetReponseRange() *RangeResponse { +func (m *ResponseUnion) GetResponseRange() *RangeResponse { if m != nil { - return m.ReponseRange + return m.ResponseRange } return nil } @@ -215,17 +241,18 @@ func (m *ResponseUnion) GetResponseDeleteRange() *DeleteRangeResponse { } type Compare struct { - Type Compare_CompareType `protobuf:"varint,1,opt,name=type,proto3,enum=etcdserverpb.Compare_CompareType" json:"type,omitempty"` + Result Compare_CompareResult `protobuf:"varint,1,opt,name=result,proto3,enum=etcdserverpb.Compare_CompareResult" json:"result,omitempty"` + Target Compare_CompareTarget `protobuf:"varint,2,opt,name=target,proto3,enum=etcdserverpb.Compare_CompareTarget" json:"target,omitempty"` // key path - Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` + Key []byte `protobuf:"bytes,3,opt,name=key,proto3" json:"key,omitempty"` // version of the given key - Version int64 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"` + Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"` // create index of the given key - CreateIndex int64 `protobuf:"varint,4,opt,name=create_index,proto3" json:"create_index,omitempty"` + CreateIndex int64 `protobuf:"varint,5,opt,name=create_index,proto3" json:"create_index,omitempty"` // last modified index of the given key - ModIndex int64 `protobuf:"varint,5,opt,name=mod_index,proto3" json:"mod_index,omitempty"` + ModIndex int64 `protobuf:"varint,6,opt,name=mod_index,proto3" json:"mod_index,omitempty"` // value of the given key - Value []byte `protobuf:"bytes,6,opt,name=value,proto3" json:"value,omitempty"` + Value []byte `protobuf:"bytes,7,opt,name=value,proto3" json:"value,omitempty"` } func (m *Compare) Reset() { *m = Compare{} } @@ -326,7 +353,8 @@ func (m *CompactionResponse) GetHeader() *ResponseHeader { } func init() { - proto.RegisterEnum("etcdserverpb.Compare_CompareType", Compare_CompareType_name, Compare_CompareType_value) + proto.RegisterEnum("etcdserverpb.Compare_CompareResult", Compare_CompareResult_name, Compare_CompareResult_value) + proto.RegisterEnum("etcdserverpb.Compare_CompareTarget", Compare_CompareTarget_name, Compare_CompareTarget_value) } func (m *ResponseHeader) Unmarshal(data []byte) error { l := len(data) @@ -1145,7 +1173,7 @@ func (m *ResponseUnion) Unmarshal(data []byte) error { switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field ReponseRange", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field ResponseRange", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -1163,10 +1191,10 @@ func (m *ResponseUnion) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.ReponseRange == nil { - m.ReponseRange = &RangeResponse{} + if m.ResponseRange == nil { + m.ResponseRange = &RangeResponse{} } - if err := m.ReponseRange.Unmarshal(data[iNdEx:postIndex]); err != nil { + if err := m.ResponseRange.Unmarshal(data[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex @@ -1268,7 +1296,7 @@ func (m *Compare) Unmarshal(data []byte) error { switch fieldNum { case 1: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Result", wireType) } for shift := uint(0); ; shift += 7 { if iNdEx >= l { @@ -1276,12 +1304,27 @@ func (m *Compare) Unmarshal(data []byte) error { } b := data[iNdEx] iNdEx++ - m.Type |= (Compare_CompareType(b) & 0x7F) << shift + m.Result |= (Compare_CompareResult(b) & 0x7F) << shift if b < 0x80 { break } } case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Target", wireType) + } + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Target |= (Compare_CompareTarget(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) } @@ -1303,7 +1346,7 @@ func (m *Compare) Unmarshal(data []byte) error { } m.Key = append([]byte{}, data[iNdEx:postIndex]...) iNdEx = postIndex - case 3: + case 4: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType) } @@ -1318,7 +1361,7 @@ func (m *Compare) Unmarshal(data []byte) error { break } } - case 4: + case 5: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field CreateIndex", wireType) } @@ -1333,7 +1376,7 @@ func (m *Compare) Unmarshal(data []byte) error { break } } - case 5: + case 6: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field ModIndex", wireType) } @@ -1348,7 +1391,7 @@ func (m *Compare) Unmarshal(data []byte) error { break } } - case 6: + case 7: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) } @@ -1979,8 +2022,8 @@ func (m *RequestUnion) Size() (n int) { func (m *ResponseUnion) Size() (n int) { var l int _ = l - if m.ReponseRange != nil { - l = m.ReponseRange.Size() + if m.ResponseRange != nil { + l = m.ResponseRange.Size() n += 1 + l + sovRpc(uint64(l)) } if m.ResponsePut != nil { @@ -1997,8 +2040,11 @@ func (m *ResponseUnion) Size() (n int) { func (m *Compare) Size() (n int) { var l int _ = l - if m.Type != 0 { - n += 1 + sovRpc(uint64(m.Type)) + if m.Result != 0 { + n += 1 + sovRpc(uint64(m.Result)) + } + if m.Target != 0 { + n += 1 + sovRpc(uint64(m.Target)) } if m.Key != nil { l = len(m.Key) @@ -2425,11 +2471,11 @@ func (m *ResponseUnion) MarshalTo(data []byte) (n int, err error) { _ = i var l int _ = l - if m.ReponseRange != nil { + if m.ResponseRange != nil { data[i] = 0xa i++ - i = encodeVarintRpc(data, i, uint64(m.ReponseRange.Size())) - n7, err := m.ReponseRange.MarshalTo(data[i:]) + i = encodeVarintRpc(data, i, uint64(m.ResponseRange.Size())) + n7, err := m.ResponseRange.MarshalTo(data[i:]) if err != nil { return 0, err } @@ -2473,37 +2519,42 @@ func (m *Compare) MarshalTo(data []byte) (n int, err error) { _ = i var l int _ = l - if m.Type != 0 { + if m.Result != 0 { data[i] = 0x8 i++ - i = encodeVarintRpc(data, i, uint64(m.Type)) + i = encodeVarintRpc(data, i, uint64(m.Result)) + } + if m.Target != 0 { + data[i] = 0x10 + i++ + i = encodeVarintRpc(data, i, uint64(m.Target)) } if m.Key != nil { if len(m.Key) > 0 { - data[i] = 0x12 + data[i] = 0x1a i++ i = encodeVarintRpc(data, i, uint64(len(m.Key))) i += copy(data[i:], m.Key) } } if m.Version != 0 { - data[i] = 0x18 + data[i] = 0x20 i++ i = encodeVarintRpc(data, i, uint64(m.Version)) } if m.CreateIndex != 0 { - data[i] = 0x20 + data[i] = 0x28 i++ i = encodeVarintRpc(data, i, uint64(m.CreateIndex)) } if m.ModIndex != 0 { - data[i] = 0x28 + data[i] = 0x30 i++ i = encodeVarintRpc(data, i, uint64(m.ModIndex)) } if m.Value != nil { if len(m.Value) > 0 { - data[i] = 0x32 + data[i] = 0x3a i++ i = encodeVarintRpc(data, i, uint64(len(m.Value))) i += copy(data[i:], m.Value) diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index 4c6bffecf..d90b2bfe8 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -4,6 +4,9 @@ package etcdserverpb; import "github.com/gogo/protobuf/gogoproto/gogo.proto"; import "github.com/coreos/etcd/storage/storagepb/kv.proto"; +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; + // Interface exported by the server. service etcd { // Range gets the keys in the range from the store. @@ -88,30 +91,37 @@ message RequestUnion { message ResponseUnion { oneof response { - RangeResponse reponse_range = 1; + RangeResponse response_range = 1; PutResponse response_put = 2; DeleteRangeResponse response_delete_range = 3; } } message Compare { - enum CompareType { + enum CompareResult { EQUAL = 0; GREATER = 1; LESS = 2; } - CompareType type = 1; + enum CompareTarget { + VERSION = 0; + CREATE = 1; + MOD = 2; + VALUE= 3; + } + CompareResult result = 1; + CompareTarget target = 2; // key path - bytes key = 2; - oneof target { + bytes key = 3; + oneof target_union { // version of the given key - int64 version = 3; + int64 version = 4; // create index of the given key - int64 create_index = 4; + int64 create_index = 5; // last modified index of the given key - int64 mod_index = 5; + int64 mod_index = 6; // value of the given key - bytes value = 6; + bytes value = 7; } } diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 409a56c17..6c2235d28 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -15,9 +15,12 @@ package etcdserver import ( + "bytes" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + dstorage "github.com/coreos/etcd/storage" ) type V3DemoServer interface { @@ -27,36 +30,135 @@ type V3DemoServer interface { func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message { switch { case r.Range != nil: - rr := r.Range - resp := &pb.RangeResponse{} - resp.Header = &pb.ResponseHeader{} - kvs, rev, err := s.kv.Range(rr.Key, rr.RangeEnd, rr.Limit, 0) - if err != nil { - panic("not handled error") + return doRange(s.kv, r.Range) + case r.Put != nil: + return doPut(s.kv, r.Put) + case r.DeleteRange != nil: + return doDeleteRange(s.kv, r.DeleteRange) + case r.Txn != nil: + var index int64 + rt := r.Txn + + ok := true + for _, c := range rt.Compare { + kvs, rev, err := s.kv.Range(c.Key, nil, 1, 0) + if err != nil { + ok = false + break + } + index = rev + kv := kvs[0] + + // -1 is less, 0 is equal, 1 is greater + var result int + switch c.Target { + case pb.Compare_VALUE: + result = bytes.Compare(kv.Value, c.Value) + case pb.Compare_CREATE: + result = compareInt64(kv.CreateIndex, c.CreateIndex) + case pb.Compare_MOD: + result = compareInt64(kv.ModIndex, c.ModIndex) + case pb.Compare_VERSION: + result = compareInt64(kv.Version, c.Version) + } + + switch c.Result { + case pb.Compare_EQUAL: + if result != 0 { + ok = false + } + case pb.Compare_GREATER: + if result != 1 { + ok = false + } + case pb.Compare_LESS: + if result != -1 { + ok = false + } + } + + if !ok { + break + } } - resp.Header.Index = rev - for i := range kvs { - resp.Kvs = append(resp.Kvs, &kvs[i]) + var reqs []*pb.RequestUnion + if ok { + reqs = rt.Success + } else { + reqs = rt.Failure } - return resp - case r.Put != nil: - rp := r.Put - resp := &pb.PutResponse{} - resp.Header = &pb.ResponseHeader{} - rev := s.kv.Put(rp.Key, rp.Value) - resp.Header.Index = rev - return resp - case r.DeleteRange != nil: - rd := r.DeleteRange - resp := &pb.DeleteRangeResponse{} - resp.Header = &pb.ResponseHeader{} - _, rev := s.kv.DeleteRange(rd.Key, rd.RangeEnd) - resp.Header.Index = rev - return resp - case r.Txn != nil: - panic("not implemented") + resps := make([]*pb.ResponseUnion, len(reqs)) + for i := range reqs { + resps[i] = doUnion(s.kv, reqs[i]) + } + if len(resps) != 0 { + index += 1 + } + + txnResp := &pb.TxnResponse{} + txnResp.Header = &pb.ResponseHeader{} + txnResp.Header.Index = index + txnResp.Responses = resps + txnResp.Succeeded = ok + return txnResp default: panic("not implemented") } } + +func compareInt64(a, b int64) int { + switch { + case a < b: + return -1 + case a > b: + return 1 + default: + return 0 + } +} + +func doPut(kv dstorage.KV, p *pb.PutRequest) *pb.PutResponse { + resp := &pb.PutResponse{} + resp.Header = &pb.ResponseHeader{} + rev := kv.Put(p.Key, p.Value) + resp.Header.Index = rev + return resp +} + +func doRange(kv dstorage.KV, r *pb.RangeRequest) *pb.RangeResponse { + resp := &pb.RangeResponse{} + resp.Header = &pb.ResponseHeader{} + kvs, rev, err := kv.Range(r.Key, r.RangeEnd, r.Limit, 0) + if err != nil { + panic("not handled error") + } + + resp.Header.Index = rev + for i := range kvs { + resp.Kvs = append(resp.Kvs, &kvs[i]) + } + return resp +} + +func doDeleteRange(kv dstorage.KV, dr *pb.DeleteRangeRequest) *pb.DeleteRangeResponse { + resp := &pb.DeleteRangeResponse{} + resp.Header = &pb.ResponseHeader{} + _, rev := kv.DeleteRange(dr.Key, dr.RangeEnd) + resp.Header.Index = rev + return resp +} + +func doUnion(kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion { + switch { + case union.RequestRange != nil: + return &pb.ResponseUnion{ResponseRange: doRange(kv, union.RequestRange)} + case union.RequestPut != nil: + return &pb.ResponseUnion{ResponsePut: doPut(kv, union.RequestPut)} + case union.RequestDeleteRange != nil: + return &pb.ResponseUnion{ResponseDeleteRange: doDeleteRange(kv, union.RequestDeleteRange)} + default: + // empty union + return nil + } +}