diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index b2af39908..5376eb1ef 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -44,8 +44,8 @@ func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*p } func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { - panic("not implemented") - return nil, nil + resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Txn: r}) + return resp.(*pb.TxnResponse), nil } func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 409a56c17..6c2235d28 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -15,9 +15,12 @@ package etcdserver import ( + "bytes" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + dstorage "github.com/coreos/etcd/storage" ) type V3DemoServer interface { @@ -27,36 +30,135 @@ type V3DemoServer interface { func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message { switch { case r.Range != nil: - rr := r.Range - resp := &pb.RangeResponse{} - resp.Header = &pb.ResponseHeader{} - kvs, rev, err := s.kv.Range(rr.Key, rr.RangeEnd, rr.Limit, 0) - if err != nil { - panic("not handled error") + return doRange(s.kv, r.Range) + case r.Put != nil: + return doPut(s.kv, r.Put) + case r.DeleteRange != nil: + return doDeleteRange(s.kv, r.DeleteRange) + case r.Txn != nil: + var index int64 + rt := r.Txn + + ok := true + for _, c := range rt.Compare { + kvs, rev, err := s.kv.Range(c.Key, nil, 1, 0) + if err != nil { + ok = false + break + } + index = rev + kv := kvs[0] + + // -1 is less, 0 is equal, 1 is greater + var result int + switch c.Target { + case pb.Compare_VALUE: + result = bytes.Compare(kv.Value, c.Value) + case pb.Compare_CREATE: + result = compareInt64(kv.CreateIndex, c.CreateIndex) + case pb.Compare_MOD: + result = compareInt64(kv.ModIndex, c.ModIndex) + case pb.Compare_VERSION: + result = compareInt64(kv.Version, c.Version) + } + + switch c.Result { + case pb.Compare_EQUAL: + if result != 0 { + ok = false + } + case pb.Compare_GREATER: + if result != 1 { + ok = false + } + case pb.Compare_LESS: + if result != -1 { + ok = false + } + } + + if !ok { + break + } } - resp.Header.Index = rev - for i := range kvs { - resp.Kvs = append(resp.Kvs, &kvs[i]) + var reqs []*pb.RequestUnion + if ok { + reqs = rt.Success + } else { + reqs = rt.Failure } - return resp - case r.Put != nil: - rp := r.Put - resp := &pb.PutResponse{} - resp.Header = &pb.ResponseHeader{} - rev := s.kv.Put(rp.Key, rp.Value) - resp.Header.Index = rev - return resp - case r.DeleteRange != nil: - rd := r.DeleteRange - resp := &pb.DeleteRangeResponse{} - resp.Header = &pb.ResponseHeader{} - _, rev := s.kv.DeleteRange(rd.Key, rd.RangeEnd) - resp.Header.Index = rev - return resp - case r.Txn != nil: - panic("not implemented") + resps := make([]*pb.ResponseUnion, len(reqs)) + for i := range reqs { + resps[i] = doUnion(s.kv, reqs[i]) + } + if len(resps) != 0 { + index += 1 + } + + txnResp := &pb.TxnResponse{} + txnResp.Header = &pb.ResponseHeader{} + txnResp.Header.Index = index + txnResp.Responses = resps + txnResp.Succeeded = ok + return txnResp default: panic("not implemented") } } + +func compareInt64(a, b int64) int { + switch { + case a < b: + return -1 + case a > b: + return 1 + default: + return 0 + } +} + +func doPut(kv dstorage.KV, p *pb.PutRequest) *pb.PutResponse { + resp := &pb.PutResponse{} + resp.Header = &pb.ResponseHeader{} + rev := kv.Put(p.Key, p.Value) + resp.Header.Index = rev + return resp +} + +func doRange(kv dstorage.KV, r *pb.RangeRequest) *pb.RangeResponse { + resp := &pb.RangeResponse{} + resp.Header = &pb.ResponseHeader{} + kvs, rev, err := kv.Range(r.Key, r.RangeEnd, r.Limit, 0) + if err != nil { + panic("not handled error") + } + + resp.Header.Index = rev + for i := range kvs { + resp.Kvs = append(resp.Kvs, &kvs[i]) + } + return resp +} + +func doDeleteRange(kv dstorage.KV, dr *pb.DeleteRangeRequest) *pb.DeleteRangeResponse { + resp := &pb.DeleteRangeResponse{} + resp.Header = &pb.ResponseHeader{} + _, rev := kv.DeleteRange(dr.Key, dr.RangeEnd) + resp.Header.Index = rev + return resp +} + +func doUnion(kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion { + switch { + case union.RequestRange != nil: + return &pb.ResponseUnion{ResponseRange: doRange(kv, union.RequestRange)} + case union.RequestPut != nil: + return &pb.ResponseUnion{ResponsePut: doPut(kv, union.RequestPut)} + case union.RequestDeleteRange != nil: + return &pb.ResponseUnion{ResponseDeleteRange: doDeleteRange(kv, union.RequestDeleteRange)} + default: + // empty union + return nil + } +}