Merge pull request #3809 from xiang90/rpc_kv

*: refactor kv rpc implementation
This commit is contained in:
Xiang Li 2015-11-04 19:05:48 -08:00
commit 08f0d94019
3 changed files with 71 additions and 28 deletions

View File

@ -321,7 +321,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
if cfg.v3demo {
// set up v3 demo rpc
grpcServer := grpc.NewServer()
etcdserverpb.RegisterKVServer(grpcServer, v3rpc.New(s))
etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s))
etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s.Watchable()))
go plog.Fatal(grpcServer.Serve(v3l))
}

View File

@ -23,73 +23,73 @@ import (
"github.com/coreos/etcd/storage"
)
type handler struct {
server etcdserver.V3DemoServer
type kvServer struct {
kv etcdserver.RaftKV
}
func New(s etcdserver.V3DemoServer) pb.KVServer {
return &handler{s}
func NewKVServer(s etcdserver.RaftKV) pb.KVServer {
return &kvServer{s}
}
func (h *handler) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
if err := checkRangeRequest(r); err != nil {
return nil, err
}
resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Range: r})
resp, err := s.kv.Range(ctx, r)
if err != nil {
return nil, togRPCError(err)
}
return resp.(*pb.RangeResponse), err
return resp, err
}
func (h *handler) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
if err := checkPutRequest(r); err != nil {
return nil, err
}
resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Put: r})
resp, err := s.kv.Put(ctx, r)
if err != nil {
return nil, togRPCError(err)
}
return resp.(*pb.PutResponse), err
return resp, err
}
func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
if err := checkDeleteRequest(r); err != nil {
return nil, err
}
resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{DeleteRange: r})
resp, err := s.kv.DeleteRange(ctx, r)
if err != nil {
return nil, togRPCError(err)
}
return resp.(*pb.DeleteRangeResponse), err
return resp, err
}
func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
if err := checkTxnRequest(r); err != nil {
return nil, err
}
resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Txn: r})
resp, err := s.kv.Txn(ctx, r)
if err != nil {
return nil, togRPCError(err)
}
return resp.(*pb.TxnResponse), err
return resp, err
}
func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Compaction: r})
func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
resp, err := s.kv.Compact(ctx, r)
if err != nil {
return nil, togRPCError(err)
}
return resp.(*pb.CompactionResponse), nil
return resp, nil
}
func checkRangeRequest(r *pb.RangeRequest) error {

View File

@ -23,8 +23,52 @@ import (
dstorage "github.com/coreos/etcd/storage"
)
type V3DemoServer interface {
V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error)
type RaftKV interface {
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
}
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{Range: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.RangeResponse), result.err
}
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{Put: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.PutResponse), result.err
}
func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{DeleteRange: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.DeleteRangeResponse), result.err
}
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{Txn: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.TxnResponse), result.err
}
func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{Compaction: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.CompactionResponse), result.err
}
type applyResult struct {
@ -32,12 +76,12 @@ type applyResult struct {
err error
}
func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
func (s *EtcdServer) processInternalRaftReq(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
r.ID = s.reqIDGen.Next()
data, err := r.Marshal()
if err != nil {
return &pb.EmptyResponse{}, err
return nil, err
}
ch := s.w.Register(r.ID)
@ -45,13 +89,12 @@ func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (pr
select {
case x := <-ch:
result := x.(*applyResult)
return result.resp, result.err
return x.(*applyResult), nil
case <-ctx.Done():
s.w.Trigger(r.ID, nil) // GC wait
return &pb.EmptyResponse{}, ctx.Err()
return nil, ctx.Err()
case <-s.done:
return &pb.EmptyResponse{}, ErrStopped
return nil, ErrStopped
}
}