diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index a85711cba..14d6f0234 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -321,7 +321,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { if cfg.v3demo { // set up v3 demo rpc grpcServer := grpc.NewServer() - etcdserverpb.RegisterKVServer(grpcServer, v3rpc.New(s)) + etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s)) etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s.Watchable())) go plog.Fatal(grpcServer.Serve(v3l)) } diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index a8bd355ae..cf4bd4129 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -23,73 +23,73 @@ import ( "github.com/coreos/etcd/storage" ) -type handler struct { - server etcdserver.V3DemoServer +type kvServer struct { + kv etcdserver.RaftKV } -func New(s etcdserver.V3DemoServer) pb.KVServer { - return &handler{s} +func NewKVServer(s etcdserver.RaftKV) pb.KVServer { + return &kvServer{s} } -func (h *handler) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { if err := checkRangeRequest(r); err != nil { return nil, err } - resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Range: r}) + resp, err := s.kv.Range(ctx, r) if err != nil { return nil, togRPCError(err) } - return resp.(*pb.RangeResponse), err + return resp, err } -func (h *handler) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { +func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { if err := checkPutRequest(r); err != nil { return nil, err } - resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Put: r}) + resp, err := s.kv.Put(ctx, r) if err != nil { return nil, togRPCError(err) } - return resp.(*pb.PutResponse), err + return resp, err } -func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { +func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { if err := checkDeleteRequest(r); err != nil { return nil, err } - resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{DeleteRange: r}) + resp, err := s.kv.DeleteRange(ctx, r) if err != nil { return nil, togRPCError(err) } - return resp.(*pb.DeleteRangeResponse), err + return resp, err } -func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { +func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { if err := checkTxnRequest(r); err != nil { return nil, err } - resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Txn: r}) + resp, err := s.kv.Txn(ctx, r) if err != nil { return nil, togRPCError(err) } - return resp.(*pb.TxnResponse), err + return resp, err } -func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { - resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Compaction: r}) +func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { + resp, err := s.kv.Compact(ctx, r) if err != nil { return nil, togRPCError(err) } - return resp.(*pb.CompactionResponse), nil + return resp, nil } func checkRangeRequest(r *pb.RangeRequest) error { diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 52d6874ab..be0d803a7 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -23,8 +23,52 @@ import ( dstorage "github.com/coreos/etcd/storage" ) -type V3DemoServer interface { - V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) +type RaftKV interface { + Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) + Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) + DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) + Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) + Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) +} + +func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { + result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{Range: r}) + if err != nil { + return nil, err + } + return result.resp.(*pb.RangeResponse), result.err +} + +func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { + result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{Put: r}) + if err != nil { + return nil, err + } + return result.resp.(*pb.PutResponse), result.err +} + +func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { + result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{DeleteRange: r}) + if err != nil { + return nil, err + } + return result.resp.(*pb.DeleteRangeResponse), result.err +} + +func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { + result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{Txn: r}) + if err != nil { + return nil, err + } + return result.resp.(*pb.TxnResponse), result.err +} + +func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { + result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{Compaction: r}) + if err != nil { + return nil, err + } + return result.resp.(*pb.CompactionResponse), result.err } type applyResult struct { @@ -32,12 +76,12 @@ type applyResult struct { err error } -func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { +func (s *EtcdServer) processInternalRaftReq(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) { r.ID = s.reqIDGen.Next() data, err := r.Marshal() if err != nil { - return &pb.EmptyResponse{}, err + return nil, err } ch := s.w.Register(r.ID) @@ -45,13 +89,12 @@ func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (pr select { case x := <-ch: - result := x.(*applyResult) - return result.resp, result.err + return x.(*applyResult), nil case <-ctx.Done(): s.w.Trigger(r.ID, nil) // GC wait - return &pb.EmptyResponse{}, ctx.Err() + return nil, ctx.Err() case <-s.done: - return &pb.EmptyResponse{}, ErrStopped + return nil, ErrStopped } }