diff --git a/Documentation/dev-guide/api_reference_v3.md b/Documentation/dev-guide/api_reference_v3.md index 63a09432d..b0a0ffd38 100644 --- a/Documentation/dev-guide/api_reference_v3.md +++ b/Documentation/dev-guide/api_reference_v3.md @@ -821,6 +821,22 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive +##### message `LeaseInternalRequest` (lease/leasepb/lease.proto) + +| Field | Description | Type | +| ----- | ----------- | ---- | +| LeaseTimeToLiveRequest | | etcdserverpb.LeaseTimeToLiveRequest | + + + +##### message `LeaseInternalResponse` (lease/leasepb/lease.proto) + +| Field | Description | Type | +| ----- | ----------- | ---- | +| LeaseTimeToLiveResponse | | etcdserverpb.LeaseTimeToLiveResponse | + + + ##### message `Permission` (auth/authpb/auth.proto) Permission is a single entity diff --git a/lease/leasehttp/http.go b/lease/leasehttp/http.go index cf95595b6..c82b1dafe 100644 --- a/lease/leasehttp/http.go +++ b/lease/leasehttp/http.go @@ -23,6 +23,14 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/lease/leasepb" + "github.com/coreos/etcd/pkg/httputil" + "golang.org/x/net/context" +) + +var ( + LeasePrefix = "/leases" + LeaseInternalPrefix = "/leases/internal" ) // NewHandler returns an http Handler for lease renewals @@ -44,28 +52,70 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - lreq := pb.LeaseKeepAliveRequest{} - if err := lreq.Unmarshal(b); err != nil { - http.Error(w, "error unmarshalling request", http.StatusBadRequest) - return - } + var v []byte + switch r.URL.Path { + case LeasePrefix: + lreq := pb.LeaseKeepAliveRequest{} + if err := lreq.Unmarshal(b); err != nil { + http.Error(w, "error unmarshalling request", http.StatusBadRequest) + return + } + ttl, err := h.l.Renew(lease.LeaseID(lreq.ID)) + if err != nil { + if err == lease.ErrLeaseNotFound { + http.Error(w, err.Error(), http.StatusNotFound) + return + } - ttl, err := h.l.Renew(lease.LeaseID(lreq.ID)) - if err != nil { - if err == lease.ErrLeaseNotFound { - http.Error(w, err.Error(), http.StatusNotFound) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + // TODO: fill out ResponseHeader + resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl} + v, err = resp.Marshal() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } - http.Error(w, err.Error(), http.StatusBadRequest) - return - } + case LeaseInternalPrefix: + lreq := leasepb.LeaseInternalRequest{} + if err := lreq.Unmarshal(b); err != nil { + http.Error(w, "error unmarshalling request", http.StatusBadRequest) + return + } - // TODO: fill out ResponseHeader - resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl} - v, err := resp.Marshal() - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID)) + if l == nil { + http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound) + return + } + // TODO: fill out ResponseHeader + resp := &leasepb.LeaseInternalResponse{ + LeaseTimeToLiveResponse: &pb.LeaseTimeToLiveResponse{ + Header: &pb.ResponseHeader{}, + ID: lreq.LeaseTimeToLiveRequest.ID, + TTL: int64(l.Remaining().Seconds()), + GrantedTTL: l.TTL, + }, + } + if lreq.LeaseTimeToLiveRequest.Keys { + ks := l.Keys() + kbs := make([][]byte, len(ks)) + for i := range ks { + kbs[i] = []byte(ks[i]) + } + resp.LeaseTimeToLiveResponse.Keys = kbs + } + + v, err = resp.Marshal() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + default: + http.Error(w, fmt.Sprintf("unknown request path %q", r.URL.Path), http.StatusBadRequest) return } @@ -111,3 +161,65 @@ func RenewHTTP(id lease.LeaseID, url string, rt http.RoundTripper, timeout time. } return lresp.TTL, nil } + +// TimeToLiveHTTP retrieves lease information of the given lease ID. +func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string, rt http.RoundTripper) (*leasepb.LeaseInternalResponse, error) { + // will post lreq protobuf to leader + lreq, err := (&leasepb.LeaseInternalRequest{&pb.LeaseTimeToLiveRequest{ID: int64(id), Keys: keys}}).Marshal() + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", url, bytes.NewReader(lreq)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/protobuf") + + cancel := httputil.RequestCanceler(req) + + cc := &http.Client{Transport: rt} + var b []byte + errc := make(chan error) + go func() { + // TODO detect if leader failed and retry? + resp, err := cc.Do(req) + if err != nil { + errc <- err + return + } + b, err = ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + errc <- err + return + } + if resp.StatusCode == http.StatusNotFound { + errc <- lease.ErrLeaseNotFound + return + } + if resp.StatusCode != http.StatusOK { + errc <- fmt.Errorf("lease: unknown error(%s)", string(b)) + return + } + errc <- nil + }() + select { + case derr := <-errc: + if derr != nil { + return nil, derr + } + case <-ctx.Done(): + cancel() + return nil, ctx.Err() + } + + lresp := &leasepb.LeaseInternalResponse{} + if err := lresp.Unmarshal(b); err != nil { + return nil, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b)) + } + if lresp.LeaseTimeToLiveResponse.ID != int64(id) { + return nil, fmt.Errorf("lease: renew id mismatch") + } + return lresp, nil +} diff --git a/lease/leasepb/lease.pb.go b/lease/leasepb/lease.pb.go index 62115a3c9..59e2f8663 100644 --- a/lease/leasepb/lease.pb.go +++ b/lease/leasepb/lease.pb.go @@ -10,6 +10,8 @@ It has these top-level messages: Lease + LeaseInternalRequest + LeaseInternalResponse */ package leasepb @@ -19,9 +21,11 @@ import ( proto "github.com/golang/protobuf/proto" math "math" + + io "io" ) -import io "io" +import etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal @@ -42,8 +46,28 @@ func (m *Lease) String() string { return proto.CompactTextString(m) } func (*Lease) ProtoMessage() {} func (*Lease) Descriptor() ([]byte, []int) { return fileDescriptorLease, []int{0} } +type LeaseInternalRequest struct { + LeaseTimeToLiveRequest *etcdserverpb.LeaseTimeToLiveRequest `protobuf:"bytes,1,opt,name=LeaseTimeToLiveRequest,json=leaseTimeToLiveRequest" json:"LeaseTimeToLiveRequest,omitempty"` +} + +func (m *LeaseInternalRequest) Reset() { *m = LeaseInternalRequest{} } +func (m *LeaseInternalRequest) String() string { return proto.CompactTextString(m) } +func (*LeaseInternalRequest) ProtoMessage() {} +func (*LeaseInternalRequest) Descriptor() ([]byte, []int) { return fileDescriptorLease, []int{1} } + +type LeaseInternalResponse struct { + LeaseTimeToLiveResponse *etcdserverpb.LeaseTimeToLiveResponse `protobuf:"bytes,1,opt,name=LeaseTimeToLiveResponse,json=leaseTimeToLiveResponse" json:"LeaseTimeToLiveResponse,omitempty"` +} + +func (m *LeaseInternalResponse) Reset() { *m = LeaseInternalResponse{} } +func (m *LeaseInternalResponse) String() string { return proto.CompactTextString(m) } +func (*LeaseInternalResponse) ProtoMessage() {} +func (*LeaseInternalResponse) Descriptor() ([]byte, []int) { return fileDescriptorLease, []int{2} } + func init() { proto.RegisterType((*Lease)(nil), "leasepb.Lease") + proto.RegisterType((*LeaseInternalRequest)(nil), "leasepb.LeaseInternalRequest") + proto.RegisterType((*LeaseInternalResponse)(nil), "leasepb.LeaseInternalResponse") } func (m *Lease) Marshal() (data []byte, err error) { size := m.Size() @@ -73,6 +97,62 @@ func (m *Lease) MarshalTo(data []byte) (int, error) { return i, nil } +func (m *LeaseInternalRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *LeaseInternalRequest) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.LeaseTimeToLiveRequest != nil { + data[i] = 0xa + i++ + i = encodeVarintLease(data, i, uint64(m.LeaseTimeToLiveRequest.Size())) + n1, err := m.LeaseTimeToLiveRequest.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n1 + } + return i, nil +} + +func (m *LeaseInternalResponse) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *LeaseInternalResponse) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.LeaseTimeToLiveResponse != nil { + data[i] = 0xa + i++ + i = encodeVarintLease(data, i, uint64(m.LeaseTimeToLiveResponse.Size())) + n2, err := m.LeaseTimeToLiveResponse.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n2 + } + return i, nil +} + func encodeFixed64Lease(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) @@ -112,6 +192,26 @@ func (m *Lease) Size() (n int) { return n } +func (m *LeaseInternalRequest) Size() (n int) { + var l int + _ = l + if m.LeaseTimeToLiveRequest != nil { + l = m.LeaseTimeToLiveRequest.Size() + n += 1 + l + sovLease(uint64(l)) + } + return n +} + +func (m *LeaseInternalResponse) Size() (n int) { + var l int + _ = l + if m.LeaseTimeToLiveResponse != nil { + l = m.LeaseTimeToLiveResponse.Size() + n += 1 + l + sovLease(uint64(l)) + } + return n +} + func sovLease(x uint64) (n int) { for { n++ @@ -213,6 +313,172 @@ func (m *Lease) Unmarshal(data []byte) error { } return nil } +func (m *LeaseInternalRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLease + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LeaseInternalRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LeaseInternalRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LeaseTimeToLiveRequest", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLease + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLease + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.LeaseTimeToLiveRequest == nil { + m.LeaseTimeToLiveRequest = &etcdserverpb.LeaseTimeToLiveRequest{} + } + if err := m.LeaseTimeToLiveRequest.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLease(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLease + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *LeaseInternalResponse) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLease + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LeaseInternalResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LeaseInternalResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LeaseTimeToLiveResponse", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLease + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLease + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.LeaseTimeToLiveResponse == nil { + m.LeaseTimeToLiveResponse = &etcdserverpb.LeaseTimeToLiveResponse{} + } + if err := m.LeaseTimeToLiveResponse.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLease(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLease + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipLease(data []byte) (n int, err error) { l := len(data) iNdEx := 0 @@ -319,13 +585,20 @@ var ( ) var fileDescriptorLease = []byte{ - // 126 bytes of a gzipped FileDescriptorProto + // 239 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0xce, 0x49, 0x4d, 0x2c, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x07, 0x73, 0x0a, 0x92, 0xa4, 0x44, 0xd2, - 0xf3, 0xd3, 0xf3, 0xc1, 0x62, 0xfa, 0x20, 0x16, 0x44, 0x5a, 0x49, 0x93, 0x8b, 0xd5, 0x07, 0xa4, - 0x40, 0x88, 0x8f, 0x8b, 0xc9, 0xd3, 0x45, 0x82, 0x51, 0x81, 0x51, 0x83, 0x39, 0x88, 0x29, 0xd3, - 0x45, 0x48, 0x80, 0x8b, 0x39, 0x24, 0xc4, 0x47, 0x82, 0x09, 0x2c, 0xc0, 0x5c, 0x12, 0xe2, 0xe3, - 0x24, 0x71, 0xe2, 0xa1, 0x1c, 0xc3, 0x85, 0x87, 0x72, 0x0c, 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, - 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x8c, 0xc7, 0x72, 0x0c, 0x49, 0x6c, 0x60, 0xb3, 0x8c, - 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0x0d, 0xa0, 0x42, 0x1a, 0x79, 0x00, 0x00, 0x00, + 0xf3, 0xd3, 0xf3, 0xc1, 0x62, 0xfa, 0x20, 0x16, 0x44, 0x5a, 0x4a, 0x2d, 0xb5, 0x24, 0x39, 0x45, + 0x1f, 0x44, 0x14, 0xa7, 0x16, 0x95, 0xa5, 0x16, 0x21, 0x31, 0x0b, 0x92, 0xf4, 0x8b, 0x0a, 0x92, + 0x21, 0xea, 0x94, 0x34, 0xb9, 0x58, 0x7d, 0x40, 0x06, 0x09, 0xf1, 0x71, 0x31, 0x79, 0xba, 0x48, + 0x30, 0x2a, 0x30, 0x6a, 0x30, 0x07, 0x31, 0x65, 0xba, 0x08, 0x09, 0x70, 0x31, 0x87, 0x84, 0xf8, + 0x48, 0x30, 0x81, 0x05, 0x98, 0x4b, 0x42, 0x7c, 0x94, 0x4a, 0xb8, 0x44, 0xc0, 0x4a, 0x3d, 0xf3, + 0x4a, 0x52, 0x8b, 0xf2, 0x12, 0x73, 0x82, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84, 0x62, 0xb8, + 0xc4, 0xc0, 0xe2, 0x21, 0x99, 0xb9, 0xa9, 0x21, 0xf9, 0x3e, 0x99, 0x65, 0xa9, 0x50, 0x19, 0xb0, + 0x69, 0xdc, 0x46, 0x2a, 0x7a, 0xc8, 0x76, 0xeb, 0x61, 0x57, 0x1b, 0x24, 0x96, 0x83, 0x55, 0x5c, + 0xa9, 0x82, 0x4b, 0x14, 0xcd, 0xd6, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0xa1, 0x78, 0x2e, 0x71, + 0x0c, 0xa3, 0x20, 0x52, 0x50, 0x7b, 0x55, 0x09, 0xd8, 0x0b, 0x51, 0x1c, 0x24, 0x9e, 0x83, 0x5d, + 0xc2, 0x49, 0xe2, 0xc4, 0x43, 0x39, 0x86, 0x0b, 0x0f, 0xe5, 0x18, 0x4e, 0x3c, 0x92, 0x63, 0xbc, + 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x19, 0x8f, 0xe5, 0x18, 0x92, 0xd8, 0xc0, 0x61, + 0x67, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x65, 0xaa, 0x74, 0x2e, 0x91, 0x01, 0x00, 0x00, } diff --git a/lease/leasepb/lease.proto b/lease/leasepb/lease.proto index f2c996c8a..be414b993 100644 --- a/lease/leasepb/lease.proto +++ b/lease/leasepb/lease.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package leasepb; import "gogoproto/gogo.proto"; +import "etcd/etcdserver/etcdserverpb/rpc.proto"; option (gogoproto.marshaler_all) = true; option (gogoproto.sizer_all) = true; @@ -13,3 +14,11 @@ message Lease { int64 ID = 1; int64 TTL = 2; } + +message LeaseInternalRequest { + etcdserverpb.LeaseTimeToLiveRequest LeaseTimeToLiveRequest = 1; +} + +message LeaseInternalResponse { + etcdserverpb.LeaseTimeToLiveResponse LeaseTimeToLiveResponse = 1; +} diff --git a/lease/lessor.go b/lease/lessor.go index 682d7376b..5ebe7290c 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -480,6 +480,20 @@ func (l *Lease) refresh(extend time.Duration) { // forever sets the expiry of lease to be forever. func (l *Lease) forever() { l.expiry = forever } +// Keys returns all the keys attached to the lease. +func (l *Lease) Keys() []string { + keys := make([]string, 0, len(l.itemSet)) + for k := range l.itemSet { + keys = append(keys, k.Key) + } + return keys +} + +// Remaining returns the remaining time of the lease. +func (l *Lease) Remaining() time.Duration { + return l.expiry.Sub(time.Now()) +} + type LeaseItem struct { Key string }