diff --git a/etcdctlv3/command/lease_command.go b/etcdctlv3/command/lease_command.go index c287653b4..25761ad54 100644 --- a/etcdctlv3/command/lease_command.go +++ b/etcdctlv3/command/lease_command.go @@ -16,8 +16,10 @@ package command import ( "fmt" + "io" "os" "strconv" + "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" @@ -34,6 +36,7 @@ func NewLeaseCommand() *cobra.Command { lc.AddCommand(NewLeaseCreateCommand()) lc.AddCommand(NewLeaseRevokeCommand()) + lc.AddCommand(NewLeaseKeepAliveCommand()) return lc } @@ -121,3 +124,69 @@ func leaseRevokeCommandFunc(cmd *cobra.Command, args []string) { } fmt.Printf("lease %016x revoked\n", id) } + +// NewLeaseKeepAliveCommand returns the cobra command for "lease keep-alive". +func NewLeaseKeepAliveCommand() *cobra.Command { + lc := &cobra.Command{ + Use: "keep-alive", + Short: "keep-alive is used to keep leases alive.", + + Run: leaseKeepAliveCommandFunc, + } + + return lc +} + +// leaseKeepAliveCommandFunc executes the "lease keep-alive" command. +func leaseKeepAliveCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + ExitWithError(ExitBadArgs, fmt.Errorf("lease keep-alive command needs lease ID as argument")) + } + + id, err := strconv.ParseInt(args[0], 16, 64) + if err != nil { + ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err)) + } + + endpoint, err := cmd.Flags().GetString("endpoint") + if err != nil { + ExitWithError(ExitError, err) + } + conn, err := grpc.Dial(endpoint) + if err != nil { + ExitWithError(ExitBadConnection, err) + } + lease := pb.NewLeaseClient(conn) + kStream, err := lease.LeaseKeepAlive(context.TODO()) + if err != nil { + ExitWithError(ExitBadConnection, err) + } + + nextC := make(chan int64, 1) + go leaseKeepAliveRecvLoop(kStream, nextC) + + req := &pb.LeaseKeepAliveRequest{ID: id} + for { + err := kStream.Send(req) + if err != nil { + ExitWithError(ExitError, fmt.Errorf("failed to keep-alive lease (%v)", err)) + } + next := <-nextC + time.Sleep(time.Duration(next/2) * time.Second) + } +} + +func leaseKeepAliveRecvLoop(kStream pb.Lease_LeaseKeepAliveClient, nextC chan int64) { + for { + resp, err := kStream.Recv() + if err == io.EOF { + os.Exit(ExitSuccess) + } + if err != nil { + ExitWithError(ExitError, err) + } + + fmt.Printf("lease %016x keepalived with TTL(%d)\n", resp.ID, resp.TTL) + nextC <- resp.TTL + } +} diff --git a/etcdserver/api/v3rpc/lease.go b/etcdserver/api/v3rpc/lease.go index d8b90c237..a0a450bd9 100644 --- a/etcdserver/api/v3rpc/lease.go +++ b/etcdserver/api/v3rpc/lease.go @@ -15,9 +15,12 @@ package v3rpc import ( + "io" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/lease" ) type LeaseServer struct { @@ -41,5 +44,28 @@ func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeReques } func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error { - panic("not implemented") + for { + req, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID)) + if err != nil { + if err == lease.ErrLeaseNotFound { + return ErrLeaseNotFound + } + // TODO: handle not primary error by forwarding renew requests to leader + panic("TODO: handle not primary error by forwarding renew requests to leader") + } + + resp := &pb.LeaseKeepAliveResponse{ID: req.ID, TTL: ttl} + err = stream.Send(resp) + if err != nil { + return err + } + } } diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index b8acf3c60..7cf320980 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -500,7 +500,8 @@ func (*LeaseKeepAliveRequest) ProtoMessage() {} type LeaseKeepAliveResponse struct { Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` - TTL int64 `protobuf:"varint,2,opt,proto3" json:"TTL,omitempty"` + ID int64 `protobuf:"varint,2,opt,proto3" json:"ID,omitempty"` + TTL int64 `protobuf:"varint,3,opt,proto3" json:"TTL,omitempty"` } func (m *LeaseKeepAliveResponse) Reset() { *m = LeaseKeepAliveResponse{} } @@ -1891,9 +1892,14 @@ func (m *LeaseKeepAliveResponse) MarshalTo(data []byte) (int, error) { } i += n17 } - if m.TTL != 0 { + if m.ID != 0 { data[i] = 0x10 i++ + i = encodeVarintRpc(data, i, uint64(m.ID)) + } + if m.TTL != 0 { + data[i] = 0x18 + i++ i = encodeVarintRpc(data, i, uint64(m.TTL)) } return i, nil @@ -2313,6 +2319,9 @@ func (m *LeaseKeepAliveResponse) Size() (n int) { l = m.Header.Size() n += 1 + l + sovRpc(uint64(l)) } + if m.ID != 0 { + n += 1 + sovRpc(uint64(m.ID)) + } if m.TTL != 0 { n += 1 + sovRpc(uint64(m.TTL)) } @@ -4762,6 +4771,22 @@ func (m *LeaseKeepAliveResponse) Unmarshal(data []byte) error { } iNdEx = postIndex case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType) + } + m.ID = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.ID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType) } diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index e8b468ec5..6b4ed58df 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -268,5 +268,6 @@ message LeaseKeepAliveRequest { message LeaseKeepAliveResponse { ResponseHeader header = 1; - int64 TTL = 2; + int64 ID = 2; + int64 TTL = 3; } diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 28b34af41..1d408cb6e 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -35,8 +35,14 @@ type RaftKV interface { } type Lessor interface { + // LeaseCreate sends LeaseCreate request to raft and apply it after committed. LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) + // LeaseRevoke sends LeaseRevoke request to raft and apply it after committed. LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) + + // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error + // is returned. + LeaseRenew(id lease.LeaseID) (int64, error) } func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { @@ -95,6 +101,10 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) return result.resp.(*pb.LeaseRevokeResponse), result.err } +func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) { + return s.lessor.Renew(id) +} + type applyResult struct { resp proto.Message err error diff --git a/lease/lessor.go b/lease/lessor.go index 7aac5b9e4..2027fa479 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -79,9 +79,9 @@ type Lessor interface { // Demote demotes the lessor from being the primary lessor. Demote() - // Renew renews a lease with given ID. If the ID does not exist, an error - // will be returned. - Renew(id LeaseID) error + // Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist, + // an error will be returned. + Renew(id LeaseID) (int64, error) // ExpiredLeasesC returens a chan that is used to receive expired leases. ExpiredLeasesC() <-chan []*Lease @@ -209,22 +209,22 @@ func (le *lessor) Revoke(id LeaseID) error { // Renew renews an existing lease. If the given lease does not exist or // has expired, an error will be returned. -// TODO: return new TTL? -func (le *lessor) Renew(id LeaseID) error { +func (le *lessor) Renew(id LeaseID) (int64, error) { le.mu.Lock() defer le.mu.Unlock() if !le.primary { - return ErrNotPrimary + // forward renew request to primary instead of returning error. + return -1, ErrNotPrimary } l := le.leaseMap[id] if l == nil { - return ErrLeaseNotFound + return -1, ErrLeaseNotFound } l.refresh() - return nil + return l.TTL, nil } func (le *lessor) Promote() { @@ -438,6 +438,6 @@ func (fl *FakeLessor) Promote() {} func (fl *FakeLessor) Demote() {} -func (fl *FakeLessor) Renew(id LeaseID) error { return nil } +func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil } func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil } diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 1a8ffc376..ebc0a4d9d 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -122,12 +122,15 @@ func TestLessorRenew(t *testing.T) { // manually change the ttl field l.TTL = 10 - err := le.Renew(l.ID) + ttl, err := le.Renew(l.ID) if err != nil { t.Fatalf("failed to renew lease (%v)", err) } - l = le.get(l.ID) + if ttl != l.TTL { + t.Errorf("ttl = %d, want %d", ttl, l.TTL) + } + l = le.get(l.ID) if l.expiry.Sub(time.Now()) < 9*time.Second { t.Errorf("failed to renew the lease") }