From d9ca929a336d76ddfaef4d74819e8c2bbf7edb45 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 5 Jan 2016 13:49:25 -0800 Subject: [PATCH] *: add support for lease create and revoke Basic support for lease operations like create and revoke. We still need to: 1. attach keys to leases in KV implmentation if lease field is set 2. leader periodically removes expired leases 3. leader serves keepAlive requests and follower forwards keepAlive requests to leader. --- etcdctlv3/command/lease_command.go | 123 +++++++++++++++++++ etcdctlv3/main.go | 1 + etcdmain/etcd.go | 1 + etcdserver/api/v3rpc/error.go | 7 +- etcdserver/api/v3rpc/lease.go | 45 +++++++ etcdserver/etcdserverpb/raft_internal.pb.go | 90 ++++++++++++++ etcdserver/etcdserverpb/raft_internal.proto | 4 + etcdserver/etcdserverpb/rpc.pb.go | 125 ++++++++------------ etcdserver/etcdserverpb/rpc.proto | 13 +- etcdserver/server.go | 9 +- etcdserver/v3demo_server.go | 38 ++++++ lease/lessor.go | 90 +++++++++----- lease/lessor_test.go | 46 +++---- 13 files changed, 451 insertions(+), 141 deletions(-) create mode 100644 etcdctlv3/command/lease_command.go create mode 100644 etcdserver/api/v3rpc/lease.go diff --git a/etcdctlv3/command/lease_command.go b/etcdctlv3/command/lease_command.go new file mode 100644 index 000000000..c287653b4 --- /dev/null +++ b/etcdctlv3/command/lease_command.go @@ -0,0 +1,123 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "fmt" + "os" + "strconv" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +// NewLeaseCommand returns the cobra command for "lease". +func NewLeaseCommand() *cobra.Command { + lc := &cobra.Command{ + Use: "lease", + Short: "lease is used to manage leases.", + } + + lc.AddCommand(NewLeaseCreateCommand()) + lc.AddCommand(NewLeaseRevokeCommand()) + + return lc +} + +// NewLeaseCreateCommand returns the cobra command for "lease create". +func NewLeaseCreateCommand() *cobra.Command { + lc := &cobra.Command{ + Use: "create", + Short: "create is used to create leases.", + + Run: leaseCreateCommandFunc, + } + + return lc +} + +// leaseCreateCommandFunc executes the "lease create" command. +func leaseCreateCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + ExitWithError(ExitBadArgs, fmt.Errorf("lease create command needs TTL arguement.")) + } + + ttl, err := strconv.ParseInt(args[0], 10, 64) + if err != nil { + ExitWithError(ExitBadArgs, fmt.Errorf("bad TTL (%v)", err)) + } + + endpoint, err := cmd.Flags().GetString("endpoint") + if err != nil { + ExitWithError(ExitError, err) + } + conn, err := grpc.Dial(endpoint) + if err != nil { + ExitWithError(ExitBadConnection, err) + } + lease := pb.NewLeaseClient(conn) + + req := &pb.LeaseCreateRequest{TTL: ttl} + resp, err := lease.LeaseCreate(context.Background(), req) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to create lease (%v)\n", err) + return + } + fmt.Printf("lease %016x created with TTL(%ds)\n", resp.ID, resp.TTL) +} + +// NewLeaseRevokeCommand returns the cobra command for "lease revoke". +func NewLeaseRevokeCommand() *cobra.Command { + lc := &cobra.Command{ + Use: "revoke", + Short: "revoke is used to revoke leases.", + + Run: leaseRevokeCommandFunc, + } + + return lc +} + +// leaseRevokeCommandFunc executes the "lease create" command. +func leaseRevokeCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + ExitWithError(ExitBadArgs, fmt.Errorf("lease revoke command needs 1 argument")) + } + + id, err := strconv.ParseInt(args[0], 16, 64) + if err != nil { + ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err)) + } + + endpoint, err := cmd.Flags().GetString("endpoint") + if err != nil { + ExitWithError(ExitError, err) + } + conn, err := grpc.Dial(endpoint) + if err != nil { + ExitWithError(ExitBadConnection, err) + } + lease := pb.NewLeaseClient(conn) + + req := &pb.LeaseRevokeRequest{ID: id} + _, err = lease.LeaseRevoke(context.Background(), req) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to revoke lease (%v)\n", err) + return + } + fmt.Printf("lease %016x revoked\n", id) +} diff --git a/etcdctlv3/main.go b/etcdctlv3/main.go index 04ed982c3..450196eb2 100644 --- a/etcdctlv3/main.go +++ b/etcdctlv3/main.go @@ -51,6 +51,7 @@ func init() { command.NewCompactionCommand(), command.NewWatchCommand(), command.NewVersionCommand(), + command.NewLeaseCommand(), ) } diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 3f83c379d..fe89829f2 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -324,6 +324,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { grpcServer := grpc.NewServer() etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s)) etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s)) + etcdserverpb.RegisterLeaseServer(grpcServer, v3rpc.NewLeaseServer(s)) go func() { plog.Fatal(grpcServer.Serve(v3l)) }() } diff --git a/etcdserver/api/v3rpc/error.go b/etcdserver/api/v3rpc/error.go index 5ad2269c1..35229518e 100644 --- a/etcdserver/api/v3rpc/error.go +++ b/etcdserver/api/v3rpc/error.go @@ -21,7 +21,8 @@ import ( ) var ( - ErrEmptyKey = grpc.Errorf(codes.InvalidArgument, "key is not provided") - ErrCompacted = grpc.Errorf(codes.OutOfRange, storage.ErrCompacted.Error()) - ErrFutureRev = grpc.Errorf(codes.OutOfRange, storage.ErrFutureRev.Error()) + ErrEmptyKey = grpc.Errorf(codes.InvalidArgument, "key is not provided") + ErrCompacted = grpc.Errorf(codes.OutOfRange, storage.ErrCompacted.Error()) + ErrFutureRev = grpc.Errorf(codes.OutOfRange, storage.ErrFutureRev.Error()) + ErrLeaseNotFound = grpc.Errorf(codes.NotFound, "requested lease not found") ) diff --git a/etcdserver/api/v3rpc/lease.go b/etcdserver/api/v3rpc/lease.go new file mode 100644 index 000000000..d8b90c237 --- /dev/null +++ b/etcdserver/api/v3rpc/lease.go @@ -0,0 +1,45 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3rpc + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/etcdserver" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +type LeaseServer struct { + le etcdserver.Lessor +} + +func NewLeaseServer(le etcdserver.Lessor) pb.LeaseServer { + return &LeaseServer{le: le} +} + +func (ls *LeaseServer) LeaseCreate(ctx context.Context, cr *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { + return ls.le.LeaseCreate(ctx, cr) +} + +func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { + r, err := ls.le.LeaseRevoke(ctx, rr) + if err != nil { + return nil, ErrLeaseNotFound + } + return r, nil +} + +func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error { + panic("not implemented") +} diff --git a/etcdserver/etcdserverpb/raft_internal.pb.go b/etcdserver/etcdserverpb/raft_internal.pb.go index 25773a69b..8d6e2b591 100644 --- a/etcdserver/etcdserverpb/raft_internal.pb.go +++ b/etcdserver/etcdserverpb/raft_internal.pb.go @@ -24,6 +24,8 @@ type InternalRaftRequest struct { DeleteRange *DeleteRangeRequest `protobuf:"bytes,5,opt,name=delete_range" json:"delete_range,omitempty"` Txn *TxnRequest `protobuf:"bytes,6,opt,name=txn" json:"txn,omitempty"` Compaction *CompactionRequest `protobuf:"bytes,7,opt,name=compaction" json:"compaction,omitempty"` + LeaseCreate *LeaseCreateRequest `protobuf:"bytes,8,opt,name=lease_create" json:"lease_create,omitempty"` + LeaseRevoke *LeaseRevokeRequest `protobuf:"bytes,9,opt,name=lease_revoke" json:"lease_revoke,omitempty"` } func (m *InternalRaftRequest) Reset() { *m = InternalRaftRequest{} } @@ -117,6 +119,26 @@ func (m *InternalRaftRequest) MarshalTo(data []byte) (int, error) { } i += n6 } + if m.LeaseCreate != nil { + data[i] = 0x42 + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.LeaseCreate.Size())) + n7, err := m.LeaseCreate.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n7 + } + if m.LeaseRevoke != nil { + data[i] = 0x4a + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.LeaseRevoke.Size())) + n8, err := m.LeaseRevoke.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n8 + } return i, nil } @@ -195,6 +217,14 @@ func (m *InternalRaftRequest) Size() (n int) { l = m.Compaction.Size() n += 1 + l + sovRaftInternal(uint64(l)) } + if m.LeaseCreate != nil { + l = m.LeaseCreate.Size() + n += 1 + l + sovRaftInternal(uint64(l)) + } + if m.LeaseRevoke != nil { + l = m.LeaseRevoke.Size() + n += 1 + l + sovRaftInternal(uint64(l)) + } return n } @@ -432,6 +462,66 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error { return err } iNdEx = postIndex + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LeaseCreate", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRaftInternal + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.LeaseCreate == nil { + m.LeaseCreate = &LeaseCreateRequest{} + } + if err := m.LeaseCreate.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 9: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LeaseRevoke", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRaftInternal + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.LeaseRevoke == nil { + m.LeaseRevoke = &LeaseRevokeRequest{} + } + if err := m.LeaseRevoke.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: var sizeOfWire int for { diff --git a/etcdserver/etcdserverpb/raft_internal.proto b/etcdserver/etcdserverpb/raft_internal.proto index 6b9adc6c7..e45161604 100644 --- a/etcdserver/etcdserverpb/raft_internal.proto +++ b/etcdserver/etcdserverpb/raft_internal.proto @@ -15,11 +15,15 @@ option (gogoproto.goproto_getters_all) = false; message InternalRaftRequest { uint64 ID = 1; Request v2 = 2; + RangeRequest range = 3; PutRequest put = 4; DeleteRangeRequest delete_range = 5; TxnRequest txn = 6; CompactionRequest compaction = 7; + + LeaseCreateRequest lease_create = 8; + LeaseRevokeRequest lease_revoke = 9; } message EmptyResponse { diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index da9848371..b8acf3c60 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -441,7 +441,7 @@ func (m *WatchResponse) GetEvents() []*storagepb.Event { type LeaseCreateRequest struct { // advisory ttl in seconds - Ttl int64 `protobuf:"varint,1,opt,name=ttl,proto3" json:"ttl,omitempty"` + TTL int64 `protobuf:"varint,1,opt,proto3" json:"TTL,omitempty"` } func (m *LeaseCreateRequest) Reset() { *m = LeaseCreateRequest{} } @@ -449,10 +449,10 @@ func (m *LeaseCreateRequest) String() string { return proto.CompactTextString(m) func (*LeaseCreateRequest) ProtoMessage() {} type LeaseCreateResponse struct { - Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` - LeaseId int64 `protobuf:"varint,2,opt,name=lease_id,proto3" json:"lease_id,omitempty"` + Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` + ID int64 `protobuf:"varint,2,opt,proto3" json:"ID,omitempty"` // server decided ttl in second - Ttl int64 `protobuf:"varint,3,opt,name=ttl,proto3" json:"ttl,omitempty"` + TTL int64 `protobuf:"varint,3,opt,proto3" json:"TTL,omitempty"` Error string `protobuf:"bytes,4,opt,name=error,proto3" json:"error,omitempty"` } @@ -468,7 +468,7 @@ func (m *LeaseCreateResponse) GetHeader() *ResponseHeader { } type LeaseRevokeRequest struct { - LeaseId int64 `protobuf:"varint,1,opt,name=lease_id,proto3" json:"lease_id,omitempty"` + ID int64 `protobuf:"varint,1,opt,proto3" json:"ID,omitempty"` } func (m *LeaseRevokeRequest) Reset() { *m = LeaseRevokeRequest{} } @@ -491,7 +491,7 @@ func (m *LeaseRevokeResponse) GetHeader() *ResponseHeader { } type LeaseKeepAliveRequest struct { - LeaseId int64 `protobuf:"varint,1,opt,name=lease_id,proto3" json:"lease_id,omitempty"` + ID int64 `protobuf:"varint,1,opt,proto3" json:"ID,omitempty"` } func (m *LeaseKeepAliveRequest) Reset() { *m = LeaseKeepAliveRequest{} } @@ -499,9 +499,8 @@ func (m *LeaseKeepAliveRequest) String() string { return proto.CompactTextString func (*LeaseKeepAliveRequest) ProtoMessage() {} type LeaseKeepAliveResponse struct { - Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` - LeaseId int64 `protobuf:"varint,2,opt,name=lease_id,proto3" json:"lease_id,omitempty"` - Ttl int64 `protobuf:"varint,3,opt,name=ttl,proto3" json:"ttl,omitempty"` + Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` + TTL int64 `protobuf:"varint,2,opt,proto3" json:"TTL,omitempty"` } func (m *LeaseKeepAliveResponse) Reset() { *m = LeaseKeepAliveResponse{} } @@ -1741,10 +1740,10 @@ func (m *LeaseCreateRequest) MarshalTo(data []byte) (int, error) { _ = i var l int _ = l - if m.Ttl != 0 { + if m.TTL != 0 { data[i] = 0x8 i++ - i = encodeVarintRpc(data, i, uint64(m.Ttl)) + i = encodeVarintRpc(data, i, uint64(m.TTL)) } return i, nil } @@ -1774,15 +1773,15 @@ func (m *LeaseCreateResponse) MarshalTo(data []byte) (int, error) { } i += n15 } - if m.LeaseId != 0 { + if m.ID != 0 { data[i] = 0x10 i++ - i = encodeVarintRpc(data, i, uint64(m.LeaseId)) + i = encodeVarintRpc(data, i, uint64(m.ID)) } - if m.Ttl != 0 { + if m.TTL != 0 { data[i] = 0x18 i++ - i = encodeVarintRpc(data, i, uint64(m.Ttl)) + i = encodeVarintRpc(data, i, uint64(m.TTL)) } if len(m.Error) > 0 { data[i] = 0x22 @@ -1808,10 +1807,10 @@ func (m *LeaseRevokeRequest) MarshalTo(data []byte) (int, error) { _ = i var l int _ = l - if m.LeaseId != 0 { + if m.ID != 0 { data[i] = 0x8 i++ - i = encodeVarintRpc(data, i, uint64(m.LeaseId)) + i = encodeVarintRpc(data, i, uint64(m.ID)) } return i, nil } @@ -1859,10 +1858,10 @@ func (m *LeaseKeepAliveRequest) MarshalTo(data []byte) (int, error) { _ = i var l int _ = l - if m.LeaseId != 0 { + if m.ID != 0 { data[i] = 0x8 i++ - i = encodeVarintRpc(data, i, uint64(m.LeaseId)) + i = encodeVarintRpc(data, i, uint64(m.ID)) } return i, nil } @@ -1892,15 +1891,10 @@ func (m *LeaseKeepAliveResponse) MarshalTo(data []byte) (int, error) { } i += n17 } - if m.LeaseId != 0 { + if m.TTL != 0 { data[i] = 0x10 i++ - i = encodeVarintRpc(data, i, uint64(m.LeaseId)) - } - if m.Ttl != 0 { - data[i] = 0x18 - i++ - i = encodeVarintRpc(data, i, uint64(m.Ttl)) + i = encodeVarintRpc(data, i, uint64(m.TTL)) } return i, nil } @@ -2258,8 +2252,8 @@ func (m *WatchResponse) Size() (n int) { func (m *LeaseCreateRequest) Size() (n int) { var l int _ = l - if m.Ttl != 0 { - n += 1 + sovRpc(uint64(m.Ttl)) + if m.TTL != 0 { + n += 1 + sovRpc(uint64(m.TTL)) } return n } @@ -2271,11 +2265,11 @@ func (m *LeaseCreateResponse) Size() (n int) { l = m.Header.Size() n += 1 + l + sovRpc(uint64(l)) } - if m.LeaseId != 0 { - n += 1 + sovRpc(uint64(m.LeaseId)) + if m.ID != 0 { + n += 1 + sovRpc(uint64(m.ID)) } - if m.Ttl != 0 { - n += 1 + sovRpc(uint64(m.Ttl)) + if m.TTL != 0 { + n += 1 + sovRpc(uint64(m.TTL)) } l = len(m.Error) if l > 0 { @@ -2287,8 +2281,8 @@ func (m *LeaseCreateResponse) Size() (n int) { func (m *LeaseRevokeRequest) Size() (n int) { var l int _ = l - if m.LeaseId != 0 { - n += 1 + sovRpc(uint64(m.LeaseId)) + if m.ID != 0 { + n += 1 + sovRpc(uint64(m.ID)) } return n } @@ -2306,8 +2300,8 @@ func (m *LeaseRevokeResponse) Size() (n int) { func (m *LeaseKeepAliveRequest) Size() (n int) { var l int _ = l - if m.LeaseId != 0 { - n += 1 + sovRpc(uint64(m.LeaseId)) + if m.ID != 0 { + n += 1 + sovRpc(uint64(m.ID)) } return n } @@ -2319,11 +2313,8 @@ func (m *LeaseKeepAliveResponse) Size() (n int) { l = m.Header.Size() n += 1 + l + sovRpc(uint64(l)) } - if m.LeaseId != 0 { - n += 1 + sovRpc(uint64(m.LeaseId)) - } - if m.Ttl != 0 { - n += 1 + sovRpc(uint64(m.Ttl)) + if m.TTL != 0 { + n += 1 + sovRpc(uint64(m.TTL)) } return n } @@ -4351,16 +4342,16 @@ func (m *LeaseCreateRequest) Unmarshal(data []byte) error { switch fieldNum { case 1: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType) } - m.Ttl = 0 + m.TTL = 0 for shift := uint(0); ; shift += 7 { if iNdEx >= l { return io.ErrUnexpectedEOF } b := data[iNdEx] iNdEx++ - m.Ttl |= (int64(b) & 0x7F) << shift + m.TTL |= (int64(b) & 0x7F) << shift if b < 0x80 { break } @@ -4442,32 +4433,32 @@ func (m *LeaseCreateResponse) Unmarshal(data []byte) error { iNdEx = postIndex case 2: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType) } - m.LeaseId = 0 + m.ID = 0 for shift := uint(0); ; shift += 7 { if iNdEx >= l { return io.ErrUnexpectedEOF } b := data[iNdEx] iNdEx++ - m.LeaseId |= (int64(b) & 0x7F) << shift + m.ID |= (int64(b) & 0x7F) << shift if b < 0x80 { break } } case 3: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType) } - m.Ttl = 0 + m.TTL = 0 for shift := uint(0); ; shift += 7 { if iNdEx >= l { return io.ErrUnexpectedEOF } b := data[iNdEx] iNdEx++ - m.Ttl |= (int64(b) & 0x7F) << shift + m.TTL |= (int64(b) & 0x7F) << shift if b < 0x80 { break } @@ -4545,16 +4536,16 @@ func (m *LeaseRevokeRequest) Unmarshal(data []byte) error { switch fieldNum { case 1: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType) } - m.LeaseId = 0 + m.ID = 0 for shift := uint(0); ; shift += 7 { if iNdEx >= l { return io.ErrUnexpectedEOF } b := data[iNdEx] iNdEx++ - m.LeaseId |= (int64(b) & 0x7F) << shift + m.ID |= (int64(b) & 0x7F) << shift if b < 0x80 { break } @@ -4681,16 +4672,16 @@ func (m *LeaseKeepAliveRequest) Unmarshal(data []byte) error { switch fieldNum { case 1: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType) } - m.LeaseId = 0 + m.ID = 0 for shift := uint(0); ; shift += 7 { if iNdEx >= l { return io.ErrUnexpectedEOF } b := data[iNdEx] iNdEx++ - m.LeaseId |= (int64(b) & 0x7F) << shift + m.ID |= (int64(b) & 0x7F) << shift if b < 0x80 { break } @@ -4772,32 +4763,16 @@ func (m *LeaseKeepAliveResponse) Unmarshal(data []byte) error { iNdEx = postIndex case 2: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType) } - m.LeaseId = 0 + m.TTL = 0 for shift := uint(0); ; shift += 7 { if iNdEx >= l { return io.ErrUnexpectedEOF } b := data[iNdEx] iNdEx++ - m.LeaseId |= (int64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - case 3: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType) - } - m.Ttl = 0 - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - m.Ttl |= (int64(b) & 0x7F) << shift + m.TTL |= (int64(b) & 0x7F) << shift if b < 0x80 { break } diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index 4e4b19ce9..e8b468ec5 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -243,19 +243,19 @@ message WatchResponse { message LeaseCreateRequest { // advisory ttl in seconds - int64 ttl = 1; + int64 TTL = 1; } message LeaseCreateResponse { ResponseHeader header = 1; - int64 lease_id = 2; + int64 ID = 2; // server decided ttl in second - int64 ttl = 3; + int64 TTL = 3; string error = 4; } message LeaseRevokeRequest { - int64 lease_id = 1; + int64 ID = 1; } message LeaseRevokeResponse { @@ -263,11 +263,10 @@ message LeaseRevokeResponse { } message LeaseKeepAliveRequest { - int64 lease_id = 1; + int64 ID = 1; } message LeaseKeepAliveResponse { ResponseHeader header = 1; - int64 lease_id = 2; - int64 ttl = 3; + int64 TTL = 2; } diff --git a/etcdserver/server.go b/etcdserver/server.go index 1c9514b60..9042bfda9 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -34,6 +34,7 @@ import ( "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/lease" "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/idutil" "github.com/coreos/etcd/pkg/pbutil" @@ -166,8 +167,9 @@ type EtcdServer struct { store store.Store - kv dstorage.ConsistentWatchableKV - be backend.Backend + kv dstorage.ConsistentWatchableKV + lessor lease.Lessor + be backend.Backend stats *stats.ServerStats lstats *stats.LeaderStats @@ -360,6 +362,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if cfg.V3demo { srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename)) srv.kv = dstorage.New(srv.be, &srv.consistIndex) + srv.lessor = lease.NewLessor(uint8(id), srv.be, srv.kv) } // TODO: move transport initialization near the definition of remote @@ -589,6 +592,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { plog.Panicf("rename snapshot file error: %v", err) } + // TODO: recover leassor + newbe := backend.NewDefaultBackend(fn) if err := s.kv.Restore(newbe); err != nil { plog.Panicf("restore KV error: %v", err) diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 0f3c4d65d..b5516d9e0 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -34,6 +34,11 @@ type RaftKV interface { Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) } +type Lessor interface { + LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) + LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) +} + func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r}) if err != nil { @@ -74,6 +79,22 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb. return result.resp.(*pb.CompactionResponse), result.err } +func (s *EtcdServer) LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { + result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{LeaseCreate: r}) + if err != nil { + return nil, err + } + return result.resp.(*pb.LeaseCreateResponse), result.err +} + +func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { + result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{LeaseRevoke: r}) + if err != nil { + return nil, err + } + return result.resp.(*pb.LeaseRevokeResponse), result.err +} + type applyResult struct { resp proto.Message err error @@ -115,6 +136,7 @@ const ( func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} { kv := s.getKV() + le := s.lessor ar := &applyResult{} @@ -129,6 +151,10 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} { ar.resp, ar.err = applyTxn(kv, r.Txn) case r.Compaction != nil: ar.resp, ar.err = applyCompaction(kv, r.Compaction) + case r.LeaseCreate != nil: + ar.resp, ar.err = applyLeaseCreate(le, r.LeaseCreate) + case r.LeaseRevoke != nil: + ar.resp, ar.err = applyLeaseRevoke(le, r.LeaseRevoke) default: panic("not implemented") } @@ -348,6 +374,18 @@ func applyCompare(txnID int64, kv dstorage.KV, c *pb.Compare) (int64, bool) { return rev, true } +func applyLeaseCreate(le lease.Lessor, lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { + l := le.Grant(lc.TTL) + + return &pb.LeaseCreateResponse{ID: int64(l.ID), TTL: l.TTL}, nil +} + +func applyLeaseRevoke(le lease.Lessor, lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { + err := le.Revoke(lease.LeaseID(lc.ID)) + + return &pb.LeaseRevokeResponse{}, err +} + func compareInt64(a, b int64) int { switch { case a < b: diff --git a/lease/lessor.go b/lease/lessor.go index 3c9333268..cb90d04b5 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -43,8 +43,17 @@ type DeleteableRange interface { DeleteRange(key, end []byte) (int64, int64) } -// a lessor is the owner of leases. It can grant, revoke, -// renew and modify leases for lessee. +// A Lessor is the owner of leases. It can grant, revoke, renew and modify leases for lessee. +type Lessor interface { + // Grant grants a lease that expires at least after TTL seconds. + Grant(ttl int64) *Lease + // Revoke revokes a lease with given ID. The item attached to the + // given lease will be removed. If the ID does not exist, an error + // will be returned. + Revoke(id LeaseID) error +} + +// lessor implements Lessor interface. // TODO: use clockwork for testability. type lessor struct { mu sync.Mutex @@ -54,7 +63,7 @@ type lessor struct { // We want to make Grant, Revoke, and FindExpired all O(logN) and // Renew O(1). // FindExpired and Renew should be the most frequent operations. - leaseMap map[LeaseID]*lease + leaseMap map[LeaseID]*Lease // A DeleteableRange the lessor operates on. // When a lease expires, the lessor will delete the @@ -68,9 +77,19 @@ type lessor struct { idgen *idutil.Generator } -func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor { +func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) Lessor { + return newLessor(lessorID, b, dr) +} + +func newLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor { + // ensure the most significant bit of lessorID is 0. + // so all the IDs generated by id generator will be greater than 0. + if int8(lessorID) < 0 { + lessorID = uint8(-int8(lessorID)) + } + l := &lessor{ - leaseMap: make(map[LeaseID]*lease), + leaseMap: make(map[LeaseID]*Lease), b: b, dr: dr, idgen: idutil.NewGenerator(lessorID, time.Now()), @@ -80,10 +99,9 @@ func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor { return l } -// Grant grants a lease that expires at least after TTL seconds. // TODO: when lessor is under high load, it should give out lease // with longer TTL to reduce renew load. -func (le *lessor) Grant(ttl int64) *lease { +func (le *lessor) Grant(ttl int64) *Lease { // TODO: define max TTL expiry := time.Now().Add(time.Duration(ttl) * time.Second) expiry = minExpiry(time.Now(), expiry) @@ -93,7 +111,7 @@ func (le *lessor) Grant(ttl int64) *lease { le.mu.Lock() defer le.mu.Unlock() - l := &lease{id: id, ttl: ttl, expiry: expiry, itemSet: make(map[leaseItem]struct{})} + l := &Lease{ID: id, TTL: ttl, expiry: expiry, itemSet: make(map[leaseItem]struct{})} if _, ok := le.leaseMap[id]; ok { panic("lease: unexpected duplicate ID!") } @@ -104,9 +122,6 @@ func (le *lessor) Grant(ttl int64) *lease { return l } -// Revoke revokes a lease with given ID. The item attached to the -// given lease will be removed. If the ID does not exist, an error -// will be returned. func (le *lessor) Revoke(id LeaseID) error { le.mu.Lock() defer le.mu.Unlock() @@ -120,7 +135,7 @@ func (le *lessor) Revoke(id LeaseID) error { le.dr.DeleteRange([]byte(item.key), nil) } - delete(le.leaseMap, l.id) + delete(le.leaseMap, l.ID) l.removeFrom(le.b) return nil @@ -138,7 +153,7 @@ func (le *lessor) Renew(id LeaseID) error { return fmt.Errorf("lease: cannot find lease %x", id) } - expiry := time.Now().Add(time.Duration(l.ttl) * time.Second) + expiry := time.Now().Add(time.Duration(l.TTL) * time.Second) l.expiry = minExpiry(time.Now(), expiry) return nil } @@ -161,13 +176,24 @@ func (le *lessor) Attach(id LeaseID, items []leaseItem) error { return nil } -// findExpiredLeases loops all the leases in the leaseMap and returns the expired -// leases that needed to be revoked. -func (le *lessor) findExpiredLeases() []*lease { +func (le *lessor) Recover(b backend.Backend, dr DeleteableRange) { le.mu.Lock() defer le.mu.Unlock() - leases := make([]*lease, 0, 16) + le.b = b + le.dr = dr + le.leaseMap = make(map[LeaseID]*Lease) + + le.initAndRecover() +} + +// findExpiredLeases loops all the leases in the leaseMap and returns the expired +// leases that needed to be revoked. +func (le *lessor) findExpiredLeases() []*Lease { + le.mu.Lock() + defer le.mu.Unlock() + + leases := make([]*Lease, 0, 16) now := time.Now() for _, l := range le.leaseMap { @@ -181,7 +207,7 @@ func (le *lessor) findExpiredLeases() []*lease { // get gets the lease with given id. // get is a helper fucntion for testing, at least for now. -func (le *lessor) get(id LeaseID) *lease { +func (le *lessor) get(id LeaseID) *Lease { le.mu.Lock() defer le.mu.Unlock() @@ -191,7 +217,6 @@ func (le *lessor) get(id LeaseID) *lease { func (le *lessor) initAndRecover() { tx := le.b.BatchTx() tx.Lock() - defer tx.Unlock() tx.UnsafeCreateBucket(leaseBucketName) _, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0) @@ -200,33 +225,36 @@ func (le *lessor) initAndRecover() { var lpb leasepb.Lease err := lpb.Unmarshal(vs[i]) if err != nil { + tx.Unlock() panic("failed to unmarshal lease proto item") } - id := LeaseID(lpb.ID) - le.leaseMap[id] = &lease{ - id: id, - ttl: lpb.TTL, + ID := LeaseID(lpb.ID) + le.leaseMap[ID] = &Lease{ + ID: ID, + TTL: lpb.TTL, // itemSet will be filled in when recover key-value pairs expiry: minExpiry(time.Now(), time.Now().Add(time.Second*time.Duration(lpb.TTL))), } } + tx.Unlock() + le.b.ForceCommit() } -type lease struct { - id LeaseID - ttl int64 // time to live in seconds +type Lease struct { + ID LeaseID + TTL int64 // time to live in seconds itemSet map[leaseItem]struct{} // expiry time in unixnano expiry time.Time } -func (l lease) persistTo(b backend.Backend) { - key := int64ToBytes(int64(l.id)) +func (l Lease) persistTo(b backend.Backend) { + key := int64ToBytes(int64(l.ID)) - lpb := leasepb.Lease{ID: int64(l.id), TTL: int64(l.ttl)} + lpb := leasepb.Lease{ID: int64(l.ID), TTL: int64(l.TTL)} val, err := lpb.Marshal() if err != nil { panic("failed to marshal lease proto item") @@ -237,8 +265,8 @@ func (l lease) persistTo(b backend.Backend) { b.BatchTx().Unlock() } -func (l lease) removeFrom(b backend.Backend) { - key := int64ToBytes(int64(l.id)) +func (l Lease) removeFrom(b backend.Backend) { + key := int64ToBytes(int64(l.ID)) b.BatchTx().Lock() b.BatchTx().UnsafeDelete(leaseBucketName, key) diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 9eae6b310..6ce3435c3 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -33,10 +33,10 @@ func TestLessorGrant(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := NewLessor(1, be, &fakeDeleteable{}) + le := newLessor(1, be, &fakeDeleteable{}) l := le.Grant(1) - gl := le.get(l.id) + gl := le.get(l.ID) if !reflect.DeepEqual(gl, l) { t.Errorf("lease = %v, want %v", gl, l) @@ -46,12 +46,12 @@ func TestLessorGrant(t *testing.T) { } nl := le.Grant(1) - if nl.id == l.id { - t.Errorf("new lease.id = %x, want != %x", nl.id, l.id) + if nl.ID == l.ID { + t.Errorf("new lease.id = %x, want != %x", nl.ID, l.ID) } be.BatchTx().Lock() - _, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.id)), nil, 0) + _, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.ID)), nil, 0) if len(vs) != 1 { t.Errorf("len(vs) = %d, want 1", len(vs)) } @@ -69,7 +69,7 @@ func TestLessorRevoke(t *testing.T) { fd := &fakeDeleteable{} - le := NewLessor(1, be, fd) + le := newLessor(1, be, fd) // grant a lease with long term (100 seconds) to // avoid early termination during the test. @@ -80,18 +80,18 @@ func TestLessorRevoke(t *testing.T) { {"bar"}, } - err := le.Attach(l.id, items) + err := le.Attach(l.ID, items) if err != nil { t.Fatalf("failed to attach items to the lease: %v", err) } - err = le.Revoke(l.id) + err = le.Revoke(l.ID) if err != nil { t.Fatal("failed to revoke lease:", err) } - if le.get(l.id) != nil { - t.Errorf("got revoked lease %x", l.id) + if le.get(l.ID) != nil { + t.Errorf("got revoked lease %x", l.ID) } wdeleted := []string{"foo_", "bar_"} @@ -100,7 +100,7 @@ func TestLessorRevoke(t *testing.T) { } be.BatchTx().Lock() - _, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.id)), nil, 0) + _, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.ID)), nil, 0) if len(vs) != 0 { t.Errorf("len(vs) = %d, want 0", len(vs)) } @@ -113,14 +113,14 @@ func TestLessorRenew(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := NewLessor(1, be, &fakeDeleteable{}) + le := newLessor(1, be, &fakeDeleteable{}) l := le.Grant(5) // manually change the ttl field - l.ttl = 10 + l.TTL = 10 - le.Renew(l.id) - l = le.get(l.id) + le.Renew(l.ID) + l = le.get(l.ID) if l.expiry.Sub(time.Now()) < 9*time.Second { t.Errorf("failed to renew the lease") @@ -134,20 +134,20 @@ func TestLessorRecover(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := NewLessor(1, be, &fakeDeleteable{}) + le := newLessor(1, be, &fakeDeleteable{}) l1 := le.Grant(10) l2 := le.Grant(20) // Create a new lessor with the same backend - nle := NewLessor(1, be, &fakeDeleteable{}) - nl1 := nle.get(l1.id) - if nl1 == nil || nl1.ttl != l1.ttl { - t.Errorf("nl1 = %v, want nl1.TTL= %d", l1.ttl) + nle := newLessor(1, be, &fakeDeleteable{}) + nl1 := nle.get(l1.ID) + if nl1 == nil || nl1.TTL != l1.TTL { + t.Errorf("nl1 = %v, want nl1.TTL= %d", nl1.TTL, l1.TTL) } - nl2 := nle.get(l2.id) - if nl2 == nil || nl2.ttl != l2.ttl { - t.Errorf("nl2 = %v, want nl2.TTL= %d", l2.ttl) + nl2 := nle.get(l2.ID) + if nl2 == nil || nl2.TTL != l2.TTL { + t.Errorf("nl2 = %v, want nl2.TTL= %d", nl2.TTL, l2.TTL) } }