From 63b0cd470d8f4a1cf7b570ab7db2f33814fc0935 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Fri, 9 Sep 2016 08:14:14 +0900 Subject: [PATCH] etcdserver: implement 'LeaseTimeToLive' --- etcdserver/api/v3rpc/lease.go | 9 +++++ etcdserver/v3_server.go | 76 +++++++++++++++++++++++++++++------ 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/etcdserver/api/v3rpc/lease.go b/etcdserver/api/v3rpc/lease.go index 108e1387a..0ebffadb4 100644 --- a/etcdserver/api/v3rpc/lease.go +++ b/etcdserver/api/v3rpc/lease.go @@ -54,6 +54,15 @@ func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeReques return resp, nil } +func (ls *LeaseServer) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { + resp, err := ls.le.LeaseTimeToLive(ctx, rr) + if err != nil { + return nil, rpctypes.ErrGRPCLeaseNotFound + } + ls.hdr.fill(resp.Header) + return resp, nil +} + func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error { for { req, err := stream.Recv() diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 3e041d239..395f75deb 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -21,8 +21,10 @@ import ( "github.com/coreos/etcd/auth" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/lease/leasehttp" + "github.com/coreos/etcd/lease/leasepb" "github.com/coreos/etcd/mvcc" "golang.org/x/net/context" "google.golang.org/grpc/metadata" @@ -59,6 +61,9 @@ type Lessor interface { // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error // is returned. LeaseRenew(id lease.LeaseID) (int64, error) + + // LeaseTimeToLive retrieves lease information. + LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) } type Authenticator interface { @@ -219,7 +224,7 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) { ttl, err := s.lessor.Renew(id) - if err == nil { + if err == nil { // already requested to primary lessor(leader) return ttl, nil } if err != lease.ErrNotPrimary { @@ -227,6 +232,61 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) { } // renewals don't go through raft; forward to leader manually + leader, err := s.waitLeader() + if err != nil { + return -1, err + } + + for _, url := range leader.PeerURLs { + lurl := url + leasehttp.LeasePrefix + ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout()) + if err == nil { + break + } + } + return ttl, err +} + +func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { + if s.Leader() == s.ID() { + // primary; timetolive directly from leader + le := s.lessor.Lookup(lease.LeaseID(r.ID)) + if le == nil { + return nil, lease.ErrLeaseNotFound + } + // TODO: fill out ResponseHeader + resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL} + if r.Keys { + ks := le.Keys() + kbs := make([][]byte, len(ks)) + for i := range ks { + kbs[i] = []byte(ks[i]) + } + resp.Keys = kbs + } + return resp, nil + } + + // manually request to leader + leader, err := s.waitLeader() + if err != nil { + return nil, err + } + + var lresp *pb.LeaseTimeToLiveResponse + for _, url := range leader.PeerURLs { + lurl := url + leasehttp.LeaseInternalPrefix + var iresp *leasepb.LeaseInternalResponse + iresp, err = leasehttp.TimeToLiveHTTP(ctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt) + if err == nil { + lresp = iresp.LeaseTimeToLiveResponse + break + } + } + return lresp, nil +} + +func (s *EtcdServer) waitLeader() (*membership.Member, error) { leader := s.cluster.Member(s.Leader()) for i := 0; i < 5 && leader == nil; i++ { // wait an election @@ -235,21 +295,13 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) { case <-time.After(dur): leader = s.cluster.Member(s.Leader()) case <-s.done: - return -1, ErrStopped + return nil, ErrStopped } } if leader == nil || len(leader.PeerURLs) == 0 { - return -1, ErrNoLeader + return nil, ErrNoLeader } - - for _, url := range leader.PeerURLs { - lurl := url + "/leases" - ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout()) - if err == nil { - break - } - } - return ttl, err + return leader, nil } func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {