From 4c81615cef07bd6b5edf9762f983303374e16732 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 13 Sep 2015 08:32:01 -0700 Subject: [PATCH] etcdserver: initial support for cluster-wide v3 request --- etcdserver/api/v3rpc/key.go | 16 +++++------ etcdserver/server.go | 2 ++ etcdserver/v3demo_server.go | 57 ++++++++++++++++++++++++++----------- 3 files changed, 50 insertions(+), 25 deletions(-) diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index 93891ade0..57f83a5ea 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -29,23 +29,23 @@ func New(s etcdserver.V3DemoServer) pb.EtcdServer { } func (h *handler) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { - resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Range: r}) - return resp.(*pb.RangeResponse), nil + resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Range: r}) + return resp.(*pb.RangeResponse), err } func (h *handler) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { - resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Put: r}) - return resp.(*pb.PutResponse), nil + resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Put: r}) + return resp.(*pb.PutResponse), err } func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { - resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{DeleteRange: r}) - return resp.(*pb.DeleteRangeResponse), nil + resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{DeleteRange: r}) + return resp.(*pb.DeleteRangeResponse), err } func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { - resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Txn: r}) - return resp.(*pb.TxnResponse), nil + resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Txn: r}) + return resp.(*pb.TxnResponse), err } func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { diff --git a/etcdserver/server.go b/etcdserver/server.go index 61d4c8aae..9a5dbe534 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -770,6 +770,8 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint case raftReq.V2 != nil: req := raftReq.V2 s.w.Trigger(req.ID, s.applyRequest(*req)) + default: + s.w.Trigger(raftReq.ID, s.applyV3Request(&raftReq)) } } case raftpb.EntryConfChange: diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 24fb7382b..957ac5cbc 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -24,25 +24,48 @@ import ( ) type V3DemoServer interface { - V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message + V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) } -func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message { +func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { + r.ID = s.reqIDGen.Next() + + data, err := r.Marshal() + if err != nil { + return &pb.EmptyResponse{}, err + } + ch := s.w.Register(r.ID) + + s.r.Propose(ctx, data) + + select { + case x := <-ch: + resp := x.(proto.Message) + return resp, nil + case <-ctx.Done(): + s.w.Trigger(r.ID, nil) // GC wait + return &pb.EmptyResponse{}, ctx.Err() + case <-s.done: + return &pb.EmptyResponse{}, ErrStopped + } +} + +func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} { switch { case r.Range != nil: - return doRange(s.kv, r.Range) + return applyRange(s.kv, r.Range) case r.Put != nil: - return doPut(s.kv, r.Put) + return applyPut(s.kv, r.Put) case r.DeleteRange != nil: - return doDeleteRange(s.kv, r.DeleteRange) + return applyDeleteRange(s.kv, r.DeleteRange) case r.Txn != nil: - return doTxn(s.kv, r.Txn) + return applyTxn(s.kv, r.Txn) default: panic("not implemented") } } -func doPut(kv dstorage.KV, p *pb.PutRequest) *pb.PutResponse { +func applyPut(kv dstorage.KV, p *pb.PutRequest) *pb.PutResponse { resp := &pb.PutResponse{} resp.Header = &pb.ResponseHeader{} rev := kv.Put(p.Key, p.Value) @@ -50,7 +73,7 @@ func doPut(kv dstorage.KV, p *pb.PutRequest) *pb.PutResponse { return resp } -func doRange(kv dstorage.KV, r *pb.RangeRequest) *pb.RangeResponse { +func applyRange(kv dstorage.KV, r *pb.RangeRequest) *pb.RangeResponse { resp := &pb.RangeResponse{} resp.Header = &pb.ResponseHeader{} kvs, rev, err := kv.Range(r.Key, r.RangeEnd, r.Limit, 0) @@ -65,7 +88,7 @@ func doRange(kv dstorage.KV, r *pb.RangeRequest) *pb.RangeResponse { return resp } -func doDeleteRange(kv dstorage.KV, dr *pb.DeleteRangeRequest) *pb.DeleteRangeResponse { +func applyDeleteRange(kv dstorage.KV, dr *pb.DeleteRangeRequest) *pb.DeleteRangeResponse { resp := &pb.DeleteRangeResponse{} resp.Header = &pb.ResponseHeader{} _, rev := kv.DeleteRange(dr.Key, dr.RangeEnd) @@ -73,12 +96,12 @@ func doDeleteRange(kv dstorage.KV, dr *pb.DeleteRangeRequest) *pb.DeleteRangeRes return resp } -func doTxn(kv dstorage.KV, rt *pb.TxnRequest) *pb.TxnResponse { +func applyTxn(kv dstorage.KV, rt *pb.TxnRequest) *pb.TxnResponse { var revision int64 ok := true for _, c := range rt.Compare { - if revision, ok = doCompare(kv, c); !ok { + if revision, ok = applyCompare(kv, c); !ok { break } } @@ -91,7 +114,7 @@ func doTxn(kv dstorage.KV, rt *pb.TxnRequest) *pb.TxnResponse { } resps := make([]*pb.ResponseUnion, len(reqs)) for i := range reqs { - resps[i] = doUnion(kv, reqs[i]) + resps[i] = applyUnion(kv, reqs[i]) } if len(resps) != 0 { revision += 1 @@ -105,21 +128,21 @@ func doTxn(kv dstorage.KV, rt *pb.TxnRequest) *pb.TxnResponse { return txnResp } -func doUnion(kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion { +func applyUnion(kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion { switch { case union.RequestRange != nil: - return &pb.ResponseUnion{ResponseRange: doRange(kv, union.RequestRange)} + return &pb.ResponseUnion{ResponseRange: applyRange(kv, union.RequestRange)} case union.RequestPut != nil: - return &pb.ResponseUnion{ResponsePut: doPut(kv, union.RequestPut)} + return &pb.ResponseUnion{ResponsePut: applyPut(kv, union.RequestPut)} case union.RequestDeleteRange != nil: - return &pb.ResponseUnion{ResponseDeleteRange: doDeleteRange(kv, union.RequestDeleteRange)} + return &pb.ResponseUnion{ResponseDeleteRange: applyDeleteRange(kv, union.RequestDeleteRange)} default: // empty union return nil } } -func doCompare(kv dstorage.KV, c *pb.Compare) (int64, bool) { +func applyCompare(kv dstorage.KV, c *pb.Compare) (int64, bool) { ckvs, rev, err := kv.Range(c.Key, nil, 1, 0) if err != nil { return rev, false