*: add support for lease create and revoke

Basic support for lease operations like create and revoke.
We still need to:
1. attach keys to leases in KV implmentation if lease field is set
2. leader periodically removes expired leases
3. leader serves keepAlive requests and follower forwards keepAlive
requests to leader.
This commit is contained in:
Xiang Li 2016-01-05 13:49:25 -08:00
parent db60cdc42c
commit d9ca929a33
13 changed files with 451 additions and 141 deletions

View File

@ -0,0 +1,123 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package command
import (
"fmt"
"os"
"strconv"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
// NewLeaseCommand returns the cobra command for "lease".
func NewLeaseCommand() *cobra.Command {
lc := &cobra.Command{
Use: "lease",
Short: "lease is used to manage leases.",
}
lc.AddCommand(NewLeaseCreateCommand())
lc.AddCommand(NewLeaseRevokeCommand())
return lc
}
// NewLeaseCreateCommand returns the cobra command for "lease create".
func NewLeaseCreateCommand() *cobra.Command {
lc := &cobra.Command{
Use: "create",
Short: "create is used to create leases.",
Run: leaseCreateCommandFunc,
}
return lc
}
// leaseCreateCommandFunc executes the "lease create" command.
func leaseCreateCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
ExitWithError(ExitBadArgs, fmt.Errorf("lease create command needs TTL arguement."))
}
ttl, err := strconv.ParseInt(args[0], 10, 64)
if err != nil {
ExitWithError(ExitBadArgs, fmt.Errorf("bad TTL (%v)", err))
}
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
conn, err := grpc.Dial(endpoint)
if err != nil {
ExitWithError(ExitBadConnection, err)
}
lease := pb.NewLeaseClient(conn)
req := &pb.LeaseCreateRequest{TTL: ttl}
resp, err := lease.LeaseCreate(context.Background(), req)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create lease (%v)\n", err)
return
}
fmt.Printf("lease %016x created with TTL(%ds)\n", resp.ID, resp.TTL)
}
// NewLeaseRevokeCommand returns the cobra command for "lease revoke".
func NewLeaseRevokeCommand() *cobra.Command {
lc := &cobra.Command{
Use: "revoke",
Short: "revoke is used to revoke leases.",
Run: leaseRevokeCommandFunc,
}
return lc
}
// leaseRevokeCommandFunc executes the "lease create" command.
func leaseRevokeCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
ExitWithError(ExitBadArgs, fmt.Errorf("lease revoke command needs 1 argument"))
}
id, err := strconv.ParseInt(args[0], 16, 64)
if err != nil {
ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err))
}
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
conn, err := grpc.Dial(endpoint)
if err != nil {
ExitWithError(ExitBadConnection, err)
}
lease := pb.NewLeaseClient(conn)
req := &pb.LeaseRevokeRequest{ID: id}
_, err = lease.LeaseRevoke(context.Background(), req)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to revoke lease (%v)\n", err)
return
}
fmt.Printf("lease %016x revoked\n", id)
}

View File

@ -51,6 +51,7 @@ func init() {
command.NewCompactionCommand(),
command.NewWatchCommand(),
command.NewVersionCommand(),
command.NewLeaseCommand(),
)
}

View File

@ -324,6 +324,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
grpcServer := grpc.NewServer()
etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s))
etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s))
etcdserverpb.RegisterLeaseServer(grpcServer, v3rpc.NewLeaseServer(s))
go func() { plog.Fatal(grpcServer.Serve(v3l)) }()
}

View File

@ -21,7 +21,8 @@ import (
)
var (
ErrEmptyKey = grpc.Errorf(codes.InvalidArgument, "key is not provided")
ErrCompacted = grpc.Errorf(codes.OutOfRange, storage.ErrCompacted.Error())
ErrFutureRev = grpc.Errorf(codes.OutOfRange, storage.ErrFutureRev.Error())
ErrEmptyKey = grpc.Errorf(codes.InvalidArgument, "key is not provided")
ErrCompacted = grpc.Errorf(codes.OutOfRange, storage.ErrCompacted.Error())
ErrFutureRev = grpc.Errorf(codes.OutOfRange, storage.ErrFutureRev.Error())
ErrLeaseNotFound = grpc.Errorf(codes.NotFound, "requested lease not found")
)

View File

@ -0,0 +1,45 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v3rpc
import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
type LeaseServer struct {
le etcdserver.Lessor
}
func NewLeaseServer(le etcdserver.Lessor) pb.LeaseServer {
return &LeaseServer{le: le}
}
func (ls *LeaseServer) LeaseCreate(ctx context.Context, cr *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
return ls.le.LeaseCreate(ctx, cr)
}
func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
r, err := ls.le.LeaseRevoke(ctx, rr)
if err != nil {
return nil, ErrLeaseNotFound
}
return r, nil
}
func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
panic("not implemented")
}

View File

@ -24,6 +24,8 @@ type InternalRaftRequest struct {
DeleteRange *DeleteRangeRequest `protobuf:"bytes,5,opt,name=delete_range" json:"delete_range,omitempty"`
Txn *TxnRequest `protobuf:"bytes,6,opt,name=txn" json:"txn,omitempty"`
Compaction *CompactionRequest `protobuf:"bytes,7,opt,name=compaction" json:"compaction,omitempty"`
LeaseCreate *LeaseCreateRequest `protobuf:"bytes,8,opt,name=lease_create" json:"lease_create,omitempty"`
LeaseRevoke *LeaseRevokeRequest `protobuf:"bytes,9,opt,name=lease_revoke" json:"lease_revoke,omitempty"`
}
func (m *InternalRaftRequest) Reset() { *m = InternalRaftRequest{} }
@ -117,6 +119,26 @@ func (m *InternalRaftRequest) MarshalTo(data []byte) (int, error) {
}
i += n6
}
if m.LeaseCreate != nil {
data[i] = 0x42
i++
i = encodeVarintRaftInternal(data, i, uint64(m.LeaseCreate.Size()))
n7, err := m.LeaseCreate.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n7
}
if m.LeaseRevoke != nil {
data[i] = 0x4a
i++
i = encodeVarintRaftInternal(data, i, uint64(m.LeaseRevoke.Size()))
n8, err := m.LeaseRevoke.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n8
}
return i, nil
}
@ -195,6 +217,14 @@ func (m *InternalRaftRequest) Size() (n int) {
l = m.Compaction.Size()
n += 1 + l + sovRaftInternal(uint64(l))
}
if m.LeaseCreate != nil {
l = m.LeaseCreate.Size()
n += 1 + l + sovRaftInternal(uint64(l))
}
if m.LeaseRevoke != nil {
l = m.LeaseRevoke.Size()
n += 1 + l + sovRaftInternal(uint64(l))
}
return n
}
@ -432,6 +462,66 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
return err
}
iNdEx = postIndex
case 8:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseCreate", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRaftInternal
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.LeaseCreate == nil {
m.LeaseCreate = &LeaseCreateRequest{}
}
if err := m.LeaseCreate.Unmarshal(data[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 9:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseRevoke", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRaftInternal
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.LeaseRevoke == nil {
m.LeaseRevoke = &LeaseRevokeRequest{}
}
if err := m.LeaseRevoke.Unmarshal(data[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
var sizeOfWire int
for {

View File

@ -15,11 +15,15 @@ option (gogoproto.goproto_getters_all) = false;
message InternalRaftRequest {
uint64 ID = 1;
Request v2 = 2;
RangeRequest range = 3;
PutRequest put = 4;
DeleteRangeRequest delete_range = 5;
TxnRequest txn = 6;
CompactionRequest compaction = 7;
LeaseCreateRequest lease_create = 8;
LeaseRevokeRequest lease_revoke = 9;
}
message EmptyResponse {

View File

@ -441,7 +441,7 @@ func (m *WatchResponse) GetEvents() []*storagepb.Event {
type LeaseCreateRequest struct {
// advisory ttl in seconds
Ttl int64 `protobuf:"varint,1,opt,name=ttl,proto3" json:"ttl,omitempty"`
TTL int64 `protobuf:"varint,1,opt,proto3" json:"TTL,omitempty"`
}
func (m *LeaseCreateRequest) Reset() { *m = LeaseCreateRequest{} }
@ -449,10 +449,10 @@ func (m *LeaseCreateRequest) String() string { return proto.CompactTextString(m)
func (*LeaseCreateRequest) ProtoMessage() {}
type LeaseCreateResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
LeaseId int64 `protobuf:"varint,2,opt,name=lease_id,proto3" json:"lease_id,omitempty"`
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
ID int64 `protobuf:"varint,2,opt,proto3" json:"ID,omitempty"`
// server decided ttl in second
Ttl int64 `protobuf:"varint,3,opt,name=ttl,proto3" json:"ttl,omitempty"`
TTL int64 `protobuf:"varint,3,opt,proto3" json:"TTL,omitempty"`
Error string `protobuf:"bytes,4,opt,name=error,proto3" json:"error,omitempty"`
}
@ -468,7 +468,7 @@ func (m *LeaseCreateResponse) GetHeader() *ResponseHeader {
}
type LeaseRevokeRequest struct {
LeaseId int64 `protobuf:"varint,1,opt,name=lease_id,proto3" json:"lease_id,omitempty"`
ID int64 `protobuf:"varint,1,opt,proto3" json:"ID,omitempty"`
}
func (m *LeaseRevokeRequest) Reset() { *m = LeaseRevokeRequest{} }
@ -491,7 +491,7 @@ func (m *LeaseRevokeResponse) GetHeader() *ResponseHeader {
}
type LeaseKeepAliveRequest struct {
LeaseId int64 `protobuf:"varint,1,opt,name=lease_id,proto3" json:"lease_id,omitempty"`
ID int64 `protobuf:"varint,1,opt,proto3" json:"ID,omitempty"`
}
func (m *LeaseKeepAliveRequest) Reset() { *m = LeaseKeepAliveRequest{} }
@ -499,9 +499,8 @@ func (m *LeaseKeepAliveRequest) String() string { return proto.CompactTextString
func (*LeaseKeepAliveRequest) ProtoMessage() {}
type LeaseKeepAliveResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
LeaseId int64 `protobuf:"varint,2,opt,name=lease_id,proto3" json:"lease_id,omitempty"`
Ttl int64 `protobuf:"varint,3,opt,name=ttl,proto3" json:"ttl,omitempty"`
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
TTL int64 `protobuf:"varint,2,opt,proto3" json:"TTL,omitempty"`
}
func (m *LeaseKeepAliveResponse) Reset() { *m = LeaseKeepAliveResponse{} }
@ -1741,10 +1740,10 @@ func (m *LeaseCreateRequest) MarshalTo(data []byte) (int, error) {
_ = i
var l int
_ = l
if m.Ttl != 0 {
if m.TTL != 0 {
data[i] = 0x8
i++
i = encodeVarintRpc(data, i, uint64(m.Ttl))
i = encodeVarintRpc(data, i, uint64(m.TTL))
}
return i, nil
}
@ -1774,15 +1773,15 @@ func (m *LeaseCreateResponse) MarshalTo(data []byte) (int, error) {
}
i += n15
}
if m.LeaseId != 0 {
if m.ID != 0 {
data[i] = 0x10
i++
i = encodeVarintRpc(data, i, uint64(m.LeaseId))
i = encodeVarintRpc(data, i, uint64(m.ID))
}
if m.Ttl != 0 {
if m.TTL != 0 {
data[i] = 0x18
i++
i = encodeVarintRpc(data, i, uint64(m.Ttl))
i = encodeVarintRpc(data, i, uint64(m.TTL))
}
if len(m.Error) > 0 {
data[i] = 0x22
@ -1808,10 +1807,10 @@ func (m *LeaseRevokeRequest) MarshalTo(data []byte) (int, error) {
_ = i
var l int
_ = l
if m.LeaseId != 0 {
if m.ID != 0 {
data[i] = 0x8
i++
i = encodeVarintRpc(data, i, uint64(m.LeaseId))
i = encodeVarintRpc(data, i, uint64(m.ID))
}
return i, nil
}
@ -1859,10 +1858,10 @@ func (m *LeaseKeepAliveRequest) MarshalTo(data []byte) (int, error) {
_ = i
var l int
_ = l
if m.LeaseId != 0 {
if m.ID != 0 {
data[i] = 0x8
i++
i = encodeVarintRpc(data, i, uint64(m.LeaseId))
i = encodeVarintRpc(data, i, uint64(m.ID))
}
return i, nil
}
@ -1892,15 +1891,10 @@ func (m *LeaseKeepAliveResponse) MarshalTo(data []byte) (int, error) {
}
i += n17
}
if m.LeaseId != 0 {
if m.TTL != 0 {
data[i] = 0x10
i++
i = encodeVarintRpc(data, i, uint64(m.LeaseId))
}
if m.Ttl != 0 {
data[i] = 0x18
i++
i = encodeVarintRpc(data, i, uint64(m.Ttl))
i = encodeVarintRpc(data, i, uint64(m.TTL))
}
return i, nil
}
@ -2258,8 +2252,8 @@ func (m *WatchResponse) Size() (n int) {
func (m *LeaseCreateRequest) Size() (n int) {
var l int
_ = l
if m.Ttl != 0 {
n += 1 + sovRpc(uint64(m.Ttl))
if m.TTL != 0 {
n += 1 + sovRpc(uint64(m.TTL))
}
return n
}
@ -2271,11 +2265,11 @@ func (m *LeaseCreateResponse) Size() (n int) {
l = m.Header.Size()
n += 1 + l + sovRpc(uint64(l))
}
if m.LeaseId != 0 {
n += 1 + sovRpc(uint64(m.LeaseId))
if m.ID != 0 {
n += 1 + sovRpc(uint64(m.ID))
}
if m.Ttl != 0 {
n += 1 + sovRpc(uint64(m.Ttl))
if m.TTL != 0 {
n += 1 + sovRpc(uint64(m.TTL))
}
l = len(m.Error)
if l > 0 {
@ -2287,8 +2281,8 @@ func (m *LeaseCreateResponse) Size() (n int) {
func (m *LeaseRevokeRequest) Size() (n int) {
var l int
_ = l
if m.LeaseId != 0 {
n += 1 + sovRpc(uint64(m.LeaseId))
if m.ID != 0 {
n += 1 + sovRpc(uint64(m.ID))
}
return n
}
@ -2306,8 +2300,8 @@ func (m *LeaseRevokeResponse) Size() (n int) {
func (m *LeaseKeepAliveRequest) Size() (n int) {
var l int
_ = l
if m.LeaseId != 0 {
n += 1 + sovRpc(uint64(m.LeaseId))
if m.ID != 0 {
n += 1 + sovRpc(uint64(m.ID))
}
return n
}
@ -2319,11 +2313,8 @@ func (m *LeaseKeepAliveResponse) Size() (n int) {
l = m.Header.Size()
n += 1 + l + sovRpc(uint64(l))
}
if m.LeaseId != 0 {
n += 1 + sovRpc(uint64(m.LeaseId))
}
if m.Ttl != 0 {
n += 1 + sovRpc(uint64(m.Ttl))
if m.TTL != 0 {
n += 1 + sovRpc(uint64(m.TTL))
}
return n
}
@ -4351,16 +4342,16 @@ func (m *LeaseCreateRequest) Unmarshal(data []byte) error {
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType)
}
m.Ttl = 0
m.TTL = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.Ttl |= (int64(b) & 0x7F) << shift
m.TTL |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
@ -4442,32 +4433,32 @@ func (m *LeaseCreateResponse) Unmarshal(data []byte) error {
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType)
}
m.LeaseId = 0
m.ID = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.LeaseId |= (int64(b) & 0x7F) << shift
m.ID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType)
}
m.Ttl = 0
m.TTL = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.Ttl |= (int64(b) & 0x7F) << shift
m.TTL |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
@ -4545,16 +4536,16 @@ func (m *LeaseRevokeRequest) Unmarshal(data []byte) error {
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType)
}
m.LeaseId = 0
m.ID = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.LeaseId |= (int64(b) & 0x7F) << shift
m.ID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
@ -4681,16 +4672,16 @@ func (m *LeaseKeepAliveRequest) Unmarshal(data []byte) error {
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType)
}
m.LeaseId = 0
m.ID = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.LeaseId |= (int64(b) & 0x7F) << shift
m.ID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
@ -4772,32 +4763,16 @@ func (m *LeaseKeepAliveResponse) Unmarshal(data []byte) error {
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType)
}
m.LeaseId = 0
m.TTL = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.LeaseId |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType)
}
m.Ttl = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.Ttl |= (int64(b) & 0x7F) << shift
m.TTL |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}

View File

@ -243,19 +243,19 @@ message WatchResponse {
message LeaseCreateRequest {
// advisory ttl in seconds
int64 ttl = 1;
int64 TTL = 1;
}
message LeaseCreateResponse {
ResponseHeader header = 1;
int64 lease_id = 2;
int64 ID = 2;
// server decided ttl in second
int64 ttl = 3;
int64 TTL = 3;
string error = 4;
}
message LeaseRevokeRequest {
int64 lease_id = 1;
int64 ID = 1;
}
message LeaseRevokeResponse {
@ -263,11 +263,10 @@ message LeaseRevokeResponse {
}
message LeaseKeepAliveRequest {
int64 lease_id = 1;
int64 ID = 1;
}
message LeaseKeepAliveResponse {
ResponseHeader header = 1;
int64 lease_id = 2;
int64 ttl = 3;
int64 TTL = 2;
}

View File

@ -34,6 +34,7 @@ import (
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/idutil"
"github.com/coreos/etcd/pkg/pbutil"
@ -166,8 +167,9 @@ type EtcdServer struct {
store store.Store
kv dstorage.ConsistentWatchableKV
be backend.Backend
kv dstorage.ConsistentWatchableKV
lessor lease.Lessor
be backend.Backend
stats *stats.ServerStats
lstats *stats.LeaderStats
@ -360,6 +362,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if cfg.V3demo {
srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
srv.kv = dstorage.New(srv.be, &srv.consistIndex)
srv.lessor = lease.NewLessor(uint8(id), srv.be, srv.kv)
}
// TODO: move transport initialization near the definition of remote
@ -589,6 +592,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
plog.Panicf("rename snapshot file error: %v", err)
}
// TODO: recover leassor
newbe := backend.NewDefaultBackend(fn)
if err := s.kv.Restore(newbe); err != nil {
plog.Panicf("restore KV error: %v", err)

View File

@ -34,6 +34,11 @@ type RaftKV interface {
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
}
type Lessor interface {
LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error)
LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
}
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
if err != nil {
@ -74,6 +79,22 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.
return result.resp.(*pb.CompactionResponse), result.err
}
func (s *EtcdServer) LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{LeaseCreate: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.LeaseCreateResponse), result.err
}
func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.LeaseRevokeResponse), result.err
}
type applyResult struct {
resp proto.Message
err error
@ -115,6 +136,7 @@ const (
func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} {
kv := s.getKV()
le := s.lessor
ar := &applyResult{}
@ -129,6 +151,10 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} {
ar.resp, ar.err = applyTxn(kv, r.Txn)
case r.Compaction != nil:
ar.resp, ar.err = applyCompaction(kv, r.Compaction)
case r.LeaseCreate != nil:
ar.resp, ar.err = applyLeaseCreate(le, r.LeaseCreate)
case r.LeaseRevoke != nil:
ar.resp, ar.err = applyLeaseRevoke(le, r.LeaseRevoke)
default:
panic("not implemented")
}
@ -348,6 +374,18 @@ func applyCompare(txnID int64, kv dstorage.KV, c *pb.Compare) (int64, bool) {
return rev, true
}
func applyLeaseCreate(le lease.Lessor, lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
l := le.Grant(lc.TTL)
return &pb.LeaseCreateResponse{ID: int64(l.ID), TTL: l.TTL}, nil
}
func applyLeaseRevoke(le lease.Lessor, lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
err := le.Revoke(lease.LeaseID(lc.ID))
return &pb.LeaseRevokeResponse{}, err
}
func compareInt64(a, b int64) int {
switch {
case a < b:

View File

@ -43,8 +43,17 @@ type DeleteableRange interface {
DeleteRange(key, end []byte) (int64, int64)
}
// a lessor is the owner of leases. It can grant, revoke,
// renew and modify leases for lessee.
// A Lessor is the owner of leases. It can grant, revoke, renew and modify leases for lessee.
type Lessor interface {
// Grant grants a lease that expires at least after TTL seconds.
Grant(ttl int64) *Lease
// Revoke revokes a lease with given ID. The item attached to the
// given lease will be removed. If the ID does not exist, an error
// will be returned.
Revoke(id LeaseID) error
}
// lessor implements Lessor interface.
// TODO: use clockwork for testability.
type lessor struct {
mu sync.Mutex
@ -54,7 +63,7 @@ type lessor struct {
// We want to make Grant, Revoke, and FindExpired all O(logN) and
// Renew O(1).
// FindExpired and Renew should be the most frequent operations.
leaseMap map[LeaseID]*lease
leaseMap map[LeaseID]*Lease
// A DeleteableRange the lessor operates on.
// When a lease expires, the lessor will delete the
@ -68,9 +77,19 @@ type lessor struct {
idgen *idutil.Generator
}
func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) Lessor {
return newLessor(lessorID, b, dr)
}
func newLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
// ensure the most significant bit of lessorID is 0.
// so all the IDs generated by id generator will be greater than 0.
if int8(lessorID) < 0 {
lessorID = uint8(-int8(lessorID))
}
l := &lessor{
leaseMap: make(map[LeaseID]*lease),
leaseMap: make(map[LeaseID]*Lease),
b: b,
dr: dr,
idgen: idutil.NewGenerator(lessorID, time.Now()),
@ -80,10 +99,9 @@ func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
return l
}
// Grant grants a lease that expires at least after TTL seconds.
// TODO: when lessor is under high load, it should give out lease
// with longer TTL to reduce renew load.
func (le *lessor) Grant(ttl int64) *lease {
func (le *lessor) Grant(ttl int64) *Lease {
// TODO: define max TTL
expiry := time.Now().Add(time.Duration(ttl) * time.Second)
expiry = minExpiry(time.Now(), expiry)
@ -93,7 +111,7 @@ func (le *lessor) Grant(ttl int64) *lease {
le.mu.Lock()
defer le.mu.Unlock()
l := &lease{id: id, ttl: ttl, expiry: expiry, itemSet: make(map[leaseItem]struct{})}
l := &Lease{ID: id, TTL: ttl, expiry: expiry, itemSet: make(map[leaseItem]struct{})}
if _, ok := le.leaseMap[id]; ok {
panic("lease: unexpected duplicate ID!")
}
@ -104,9 +122,6 @@ func (le *lessor) Grant(ttl int64) *lease {
return l
}
// Revoke revokes a lease with given ID. The item attached to the
// given lease will be removed. If the ID does not exist, an error
// will be returned.
func (le *lessor) Revoke(id LeaseID) error {
le.mu.Lock()
defer le.mu.Unlock()
@ -120,7 +135,7 @@ func (le *lessor) Revoke(id LeaseID) error {
le.dr.DeleteRange([]byte(item.key), nil)
}
delete(le.leaseMap, l.id)
delete(le.leaseMap, l.ID)
l.removeFrom(le.b)
return nil
@ -138,7 +153,7 @@ func (le *lessor) Renew(id LeaseID) error {
return fmt.Errorf("lease: cannot find lease %x", id)
}
expiry := time.Now().Add(time.Duration(l.ttl) * time.Second)
expiry := time.Now().Add(time.Duration(l.TTL) * time.Second)
l.expiry = minExpiry(time.Now(), expiry)
return nil
}
@ -161,13 +176,24 @@ func (le *lessor) Attach(id LeaseID, items []leaseItem) error {
return nil
}
// findExpiredLeases loops all the leases in the leaseMap and returns the expired
// leases that needed to be revoked.
func (le *lessor) findExpiredLeases() []*lease {
func (le *lessor) Recover(b backend.Backend, dr DeleteableRange) {
le.mu.Lock()
defer le.mu.Unlock()
leases := make([]*lease, 0, 16)
le.b = b
le.dr = dr
le.leaseMap = make(map[LeaseID]*Lease)
le.initAndRecover()
}
// findExpiredLeases loops all the leases in the leaseMap and returns the expired
// leases that needed to be revoked.
func (le *lessor) findExpiredLeases() []*Lease {
le.mu.Lock()
defer le.mu.Unlock()
leases := make([]*Lease, 0, 16)
now := time.Now()
for _, l := range le.leaseMap {
@ -181,7 +207,7 @@ func (le *lessor) findExpiredLeases() []*lease {
// get gets the lease with given id.
// get is a helper fucntion for testing, at least for now.
func (le *lessor) get(id LeaseID) *lease {
func (le *lessor) get(id LeaseID) *Lease {
le.mu.Lock()
defer le.mu.Unlock()
@ -191,7 +217,6 @@ func (le *lessor) get(id LeaseID) *lease {
func (le *lessor) initAndRecover() {
tx := le.b.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(leaseBucketName)
_, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
@ -200,33 +225,36 @@ func (le *lessor) initAndRecover() {
var lpb leasepb.Lease
err := lpb.Unmarshal(vs[i])
if err != nil {
tx.Unlock()
panic("failed to unmarshal lease proto item")
}
id := LeaseID(lpb.ID)
le.leaseMap[id] = &lease{
id: id,
ttl: lpb.TTL,
ID := LeaseID(lpb.ID)
le.leaseMap[ID] = &Lease{
ID: ID,
TTL: lpb.TTL,
// itemSet will be filled in when recover key-value pairs
expiry: minExpiry(time.Now(), time.Now().Add(time.Second*time.Duration(lpb.TTL))),
}
}
tx.Unlock()
le.b.ForceCommit()
}
type lease struct {
id LeaseID
ttl int64 // time to live in seconds
type Lease struct {
ID LeaseID
TTL int64 // time to live in seconds
itemSet map[leaseItem]struct{}
// expiry time in unixnano
expiry time.Time
}
func (l lease) persistTo(b backend.Backend) {
key := int64ToBytes(int64(l.id))
func (l Lease) persistTo(b backend.Backend) {
key := int64ToBytes(int64(l.ID))
lpb := leasepb.Lease{ID: int64(l.id), TTL: int64(l.ttl)}
lpb := leasepb.Lease{ID: int64(l.ID), TTL: int64(l.TTL)}
val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
@ -237,8 +265,8 @@ func (l lease) persistTo(b backend.Backend) {
b.BatchTx().Unlock()
}
func (l lease) removeFrom(b backend.Backend) {
key := int64ToBytes(int64(l.id))
func (l Lease) removeFrom(b backend.Backend) {
key := int64ToBytes(int64(l.ID))
b.BatchTx().Lock()
b.BatchTx().UnsafeDelete(leaseBucketName, key)

View File

@ -33,10 +33,10 @@ func TestLessorGrant(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := NewLessor(1, be, &fakeDeleteable{})
le := newLessor(1, be, &fakeDeleteable{})
l := le.Grant(1)
gl := le.get(l.id)
gl := le.get(l.ID)
if !reflect.DeepEqual(gl, l) {
t.Errorf("lease = %v, want %v", gl, l)
@ -46,12 +46,12 @@ func TestLessorGrant(t *testing.T) {
}
nl := le.Grant(1)
if nl.id == l.id {
t.Errorf("new lease.id = %x, want != %x", nl.id, l.id)
if nl.ID == l.ID {
t.Errorf("new lease.id = %x, want != %x", nl.ID, l.ID)
}
be.BatchTx().Lock()
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.id)), nil, 0)
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.ID)), nil, 0)
if len(vs) != 1 {
t.Errorf("len(vs) = %d, want 1", len(vs))
}
@ -69,7 +69,7 @@ func TestLessorRevoke(t *testing.T) {
fd := &fakeDeleteable{}
le := NewLessor(1, be, fd)
le := newLessor(1, be, fd)
// grant a lease with long term (100 seconds) to
// avoid early termination during the test.
@ -80,18 +80,18 @@ func TestLessorRevoke(t *testing.T) {
{"bar"},
}
err := le.Attach(l.id, items)
err := le.Attach(l.ID, items)
if err != nil {
t.Fatalf("failed to attach items to the lease: %v", err)
}
err = le.Revoke(l.id)
err = le.Revoke(l.ID)
if err != nil {
t.Fatal("failed to revoke lease:", err)
}
if le.get(l.id) != nil {
t.Errorf("got revoked lease %x", l.id)
if le.get(l.ID) != nil {
t.Errorf("got revoked lease %x", l.ID)
}
wdeleted := []string{"foo_", "bar_"}
@ -100,7 +100,7 @@ func TestLessorRevoke(t *testing.T) {
}
be.BatchTx().Lock()
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.id)), nil, 0)
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.ID)), nil, 0)
if len(vs) != 0 {
t.Errorf("len(vs) = %d, want 0", len(vs))
}
@ -113,14 +113,14 @@ func TestLessorRenew(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)
le := NewLessor(1, be, &fakeDeleteable{})
le := newLessor(1, be, &fakeDeleteable{})
l := le.Grant(5)
// manually change the ttl field
l.ttl = 10
l.TTL = 10
le.Renew(l.id)
l = le.get(l.id)
le.Renew(l.ID)
l = le.get(l.ID)
if l.expiry.Sub(time.Now()) < 9*time.Second {
t.Errorf("failed to renew the lease")
@ -134,20 +134,20 @@ func TestLessorRecover(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := NewLessor(1, be, &fakeDeleteable{})
le := newLessor(1, be, &fakeDeleteable{})
l1 := le.Grant(10)
l2 := le.Grant(20)
// Create a new lessor with the same backend
nle := NewLessor(1, be, &fakeDeleteable{})
nl1 := nle.get(l1.id)
if nl1 == nil || nl1.ttl != l1.ttl {
t.Errorf("nl1 = %v, want nl1.TTL= %d", l1.ttl)
nle := newLessor(1, be, &fakeDeleteable{})
nl1 := nle.get(l1.ID)
if nl1 == nil || nl1.TTL != l1.TTL {
t.Errorf("nl1 = %v, want nl1.TTL= %d", nl1.TTL, l1.TTL)
}
nl2 := nle.get(l2.id)
if nl2 == nil || nl2.ttl != l2.ttl {
t.Errorf("nl2 = %v, want nl2.TTL= %d", l2.ttl)
nl2 := nle.get(l2.ID)
if nl2 == nil || nl2.TTL != l2.TTL {
t.Errorf("nl2 = %v, want nl2.TTL= %d", nl2.TTL, l2.TTL)
}
}