Merge pull request #4162 from xiang90/lease

*: add support for lease create and revoke
This commit is contained in:
Xiang Li 2016-01-07 16:58:59 -08:00
commit 99bee2fd29
13 changed files with 448 additions and 140 deletions

View File

@ -0,0 +1,123 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package command
import (
"fmt"
"os"
"strconv"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
// NewLeaseCommand returns the cobra command for "lease".
func NewLeaseCommand() *cobra.Command {
lc := &cobra.Command{
Use: "lease",
Short: "lease is used to manage leases.",
}
lc.AddCommand(NewLeaseCreateCommand())
lc.AddCommand(NewLeaseRevokeCommand())
return lc
}
// NewLeaseCreateCommand returns the cobra command for "lease create".
func NewLeaseCreateCommand() *cobra.Command {
lc := &cobra.Command{
Use: "create",
Short: "create is used to create leases.",
Run: leaseCreateCommandFunc,
}
return lc
}
// leaseCreateCommandFunc executes the "lease create" command.
func leaseCreateCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
ExitWithError(ExitBadArgs, fmt.Errorf("lease create command needs TTL arguement."))
}
ttl, err := strconv.ParseInt(args[0], 10, 64)
if err != nil {
ExitWithError(ExitBadArgs, fmt.Errorf("bad TTL (%v)", err))
}
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
conn, err := grpc.Dial(endpoint)
if err != nil {
ExitWithError(ExitBadConnection, err)
}
lease := pb.NewLeaseClient(conn)
req := &pb.LeaseCreateRequest{TTL: ttl}
resp, err := lease.LeaseCreate(context.Background(), req)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create lease (%v)\n", err)
return
}
fmt.Printf("lease %016x created with TTL(%ds)\n", resp.ID, resp.TTL)
}
// NewLeaseRevokeCommand returns the cobra command for "lease revoke".
func NewLeaseRevokeCommand() *cobra.Command {
lc := &cobra.Command{
Use: "revoke",
Short: "revoke is used to revoke leases.",
Run: leaseRevokeCommandFunc,
}
return lc
}
// leaseRevokeCommandFunc executes the "lease create" command.
func leaseRevokeCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
ExitWithError(ExitBadArgs, fmt.Errorf("lease revoke command needs 1 argument"))
}
id, err := strconv.ParseInt(args[0], 16, 64)
if err != nil {
ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err))
}
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
conn, err := grpc.Dial(endpoint)
if err != nil {
ExitWithError(ExitBadConnection, err)
}
lease := pb.NewLeaseClient(conn)
req := &pb.LeaseRevokeRequest{ID: id}
_, err = lease.LeaseRevoke(context.Background(), req)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to revoke lease (%v)\n", err)
return
}
fmt.Printf("lease %016x revoked\n", id)
}

View File

@ -51,6 +51,7 @@ func init() {
command.NewCompactionCommand(),
command.NewWatchCommand(),
command.NewVersionCommand(),
command.NewLeaseCommand(),
)
}

View File

@ -332,6 +332,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
grpcServer := grpc.NewServer()
etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s))
etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s))
etcdserverpb.RegisterLeaseServer(grpcServer, v3rpc.NewLeaseServer(s))
go func() { plog.Fatal(grpcServer.Serve(v3l)) }()
}

View File

@ -21,7 +21,8 @@ import (
)
var (
ErrEmptyKey = grpc.Errorf(codes.InvalidArgument, "key is not provided")
ErrCompacted = grpc.Errorf(codes.OutOfRange, storage.ErrCompacted.Error())
ErrFutureRev = grpc.Errorf(codes.OutOfRange, storage.ErrFutureRev.Error())
ErrEmptyKey = grpc.Errorf(codes.InvalidArgument, "key is not provided")
ErrCompacted = grpc.Errorf(codes.OutOfRange, storage.ErrCompacted.Error())
ErrFutureRev = grpc.Errorf(codes.OutOfRange, storage.ErrFutureRev.Error())
ErrLeaseNotFound = grpc.Errorf(codes.NotFound, "requested lease not found")
)

View File

@ -0,0 +1,45 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v3rpc
import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
type LeaseServer struct {
le etcdserver.Lessor
}
func NewLeaseServer(le etcdserver.Lessor) pb.LeaseServer {
return &LeaseServer{le: le}
}
func (ls *LeaseServer) LeaseCreate(ctx context.Context, cr *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
return ls.le.LeaseCreate(ctx, cr)
}
func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
r, err := ls.le.LeaseRevoke(ctx, rr)
if err != nil {
return nil, ErrLeaseNotFound
}
return r, nil
}
func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
panic("not implemented")
}

View File

@ -24,6 +24,8 @@ type InternalRaftRequest struct {
DeleteRange *DeleteRangeRequest `protobuf:"bytes,5,opt,name=delete_range" json:"delete_range,omitempty"`
Txn *TxnRequest `protobuf:"bytes,6,opt,name=txn" json:"txn,omitempty"`
Compaction *CompactionRequest `protobuf:"bytes,7,opt,name=compaction" json:"compaction,omitempty"`
LeaseCreate *LeaseCreateRequest `protobuf:"bytes,8,opt,name=lease_create" json:"lease_create,omitempty"`
LeaseRevoke *LeaseRevokeRequest `protobuf:"bytes,9,opt,name=lease_revoke" json:"lease_revoke,omitempty"`
}
func (m *InternalRaftRequest) Reset() { *m = InternalRaftRequest{} }
@ -117,6 +119,26 @@ func (m *InternalRaftRequest) MarshalTo(data []byte) (int, error) {
}
i += n6
}
if m.LeaseCreate != nil {
data[i] = 0x42
i++
i = encodeVarintRaftInternal(data, i, uint64(m.LeaseCreate.Size()))
n7, err := m.LeaseCreate.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n7
}
if m.LeaseRevoke != nil {
data[i] = 0x4a
i++
i = encodeVarintRaftInternal(data, i, uint64(m.LeaseRevoke.Size()))
n8, err := m.LeaseRevoke.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n8
}
return i, nil
}
@ -195,6 +217,14 @@ func (m *InternalRaftRequest) Size() (n int) {
l = m.Compaction.Size()
n += 1 + l + sovRaftInternal(uint64(l))
}
if m.LeaseCreate != nil {
l = m.LeaseCreate.Size()
n += 1 + l + sovRaftInternal(uint64(l))
}
if m.LeaseRevoke != nil {
l = m.LeaseRevoke.Size()
n += 1 + l + sovRaftInternal(uint64(l))
}
return n
}
@ -432,6 +462,66 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
return err
}
iNdEx = postIndex
case 8:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseCreate", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRaftInternal
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.LeaseCreate == nil {
m.LeaseCreate = &LeaseCreateRequest{}
}
if err := m.LeaseCreate.Unmarshal(data[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 9:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseRevoke", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRaftInternal
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.LeaseRevoke == nil {
m.LeaseRevoke = &LeaseRevokeRequest{}
}
if err := m.LeaseRevoke.Unmarshal(data[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
var sizeOfWire int
for {

View File

@ -15,11 +15,15 @@ option (gogoproto.goproto_getters_all) = false;
message InternalRaftRequest {
uint64 ID = 1;
Request v2 = 2;
RangeRequest range = 3;
PutRequest put = 4;
DeleteRangeRequest delete_range = 5;
TxnRequest txn = 6;
CompactionRequest compaction = 7;
LeaseCreateRequest lease_create = 8;
LeaseRevokeRequest lease_revoke = 9;
}
message EmptyResponse {

View File

@ -441,7 +441,7 @@ func (m *WatchResponse) GetEvents() []*storagepb.Event {
type LeaseCreateRequest struct {
// advisory ttl in seconds
Ttl int64 `protobuf:"varint,1,opt,name=ttl,proto3" json:"ttl,omitempty"`
TTL int64 `protobuf:"varint,1,opt,proto3" json:"TTL,omitempty"`
}
func (m *LeaseCreateRequest) Reset() { *m = LeaseCreateRequest{} }
@ -449,10 +449,10 @@ func (m *LeaseCreateRequest) String() string { return proto.CompactTextString(m)
func (*LeaseCreateRequest) ProtoMessage() {}
type LeaseCreateResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
LeaseId int64 `protobuf:"varint,2,opt,name=lease_id,proto3" json:"lease_id,omitempty"`
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
ID int64 `protobuf:"varint,2,opt,proto3" json:"ID,omitempty"`
// server decided ttl in second
Ttl int64 `protobuf:"varint,3,opt,name=ttl,proto3" json:"ttl,omitempty"`
TTL int64 `protobuf:"varint,3,opt,proto3" json:"TTL,omitempty"`
Error string `protobuf:"bytes,4,opt,name=error,proto3" json:"error,omitempty"`
}
@ -468,7 +468,7 @@ func (m *LeaseCreateResponse) GetHeader() *ResponseHeader {
}
type LeaseRevokeRequest struct {
LeaseId int64 `protobuf:"varint,1,opt,name=lease_id,proto3" json:"lease_id,omitempty"`
ID int64 `protobuf:"varint,1,opt,proto3" json:"ID,omitempty"`
}
func (m *LeaseRevokeRequest) Reset() { *m = LeaseRevokeRequest{} }
@ -491,7 +491,7 @@ func (m *LeaseRevokeResponse) GetHeader() *ResponseHeader {
}
type LeaseKeepAliveRequest struct {
LeaseId int64 `protobuf:"varint,1,opt,name=lease_id,proto3" json:"lease_id,omitempty"`
ID int64 `protobuf:"varint,1,opt,proto3" json:"ID,omitempty"`
}
func (m *LeaseKeepAliveRequest) Reset() { *m = LeaseKeepAliveRequest{} }
@ -499,9 +499,8 @@ func (m *LeaseKeepAliveRequest) String() string { return proto.CompactTextString
func (*LeaseKeepAliveRequest) ProtoMessage() {}
type LeaseKeepAliveResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
LeaseId int64 `protobuf:"varint,2,opt,name=lease_id,proto3" json:"lease_id,omitempty"`
Ttl int64 `protobuf:"varint,3,opt,name=ttl,proto3" json:"ttl,omitempty"`
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
TTL int64 `protobuf:"varint,2,opt,proto3" json:"TTL,omitempty"`
}
func (m *LeaseKeepAliveResponse) Reset() { *m = LeaseKeepAliveResponse{} }
@ -1741,10 +1740,10 @@ func (m *LeaseCreateRequest) MarshalTo(data []byte) (int, error) {
_ = i
var l int
_ = l
if m.Ttl != 0 {
if m.TTL != 0 {
data[i] = 0x8
i++
i = encodeVarintRpc(data, i, uint64(m.Ttl))
i = encodeVarintRpc(data, i, uint64(m.TTL))
}
return i, nil
}
@ -1774,15 +1773,15 @@ func (m *LeaseCreateResponse) MarshalTo(data []byte) (int, error) {
}
i += n15
}
if m.LeaseId != 0 {
if m.ID != 0 {
data[i] = 0x10
i++
i = encodeVarintRpc(data, i, uint64(m.LeaseId))
i = encodeVarintRpc(data, i, uint64(m.ID))
}
if m.Ttl != 0 {
if m.TTL != 0 {
data[i] = 0x18
i++
i = encodeVarintRpc(data, i, uint64(m.Ttl))
i = encodeVarintRpc(data, i, uint64(m.TTL))
}
if len(m.Error) > 0 {
data[i] = 0x22
@ -1808,10 +1807,10 @@ func (m *LeaseRevokeRequest) MarshalTo(data []byte) (int, error) {
_ = i
var l int
_ = l
if m.LeaseId != 0 {
if m.ID != 0 {
data[i] = 0x8
i++
i = encodeVarintRpc(data, i, uint64(m.LeaseId))
i = encodeVarintRpc(data, i, uint64(m.ID))
}
return i, nil
}
@ -1859,10 +1858,10 @@ func (m *LeaseKeepAliveRequest) MarshalTo(data []byte) (int, error) {
_ = i
var l int
_ = l
if m.LeaseId != 0 {
if m.ID != 0 {
data[i] = 0x8
i++
i = encodeVarintRpc(data, i, uint64(m.LeaseId))
i = encodeVarintRpc(data, i, uint64(m.ID))
}
return i, nil
}
@ -1892,15 +1891,10 @@ func (m *LeaseKeepAliveResponse) MarshalTo(data []byte) (int, error) {
}
i += n17
}
if m.LeaseId != 0 {
if m.TTL != 0 {
data[i] = 0x10
i++
i = encodeVarintRpc(data, i, uint64(m.LeaseId))
}
if m.Ttl != 0 {
data[i] = 0x18
i++
i = encodeVarintRpc(data, i, uint64(m.Ttl))
i = encodeVarintRpc(data, i, uint64(m.TTL))
}
return i, nil
}
@ -2258,8 +2252,8 @@ func (m *WatchResponse) Size() (n int) {
func (m *LeaseCreateRequest) Size() (n int) {
var l int
_ = l
if m.Ttl != 0 {
n += 1 + sovRpc(uint64(m.Ttl))
if m.TTL != 0 {
n += 1 + sovRpc(uint64(m.TTL))
}
return n
}
@ -2271,11 +2265,11 @@ func (m *LeaseCreateResponse) Size() (n int) {
l = m.Header.Size()
n += 1 + l + sovRpc(uint64(l))
}
if m.LeaseId != 0 {
n += 1 + sovRpc(uint64(m.LeaseId))
if m.ID != 0 {
n += 1 + sovRpc(uint64(m.ID))
}
if m.Ttl != 0 {
n += 1 + sovRpc(uint64(m.Ttl))
if m.TTL != 0 {
n += 1 + sovRpc(uint64(m.TTL))
}
l = len(m.Error)
if l > 0 {
@ -2287,8 +2281,8 @@ func (m *LeaseCreateResponse) Size() (n int) {
func (m *LeaseRevokeRequest) Size() (n int) {
var l int
_ = l
if m.LeaseId != 0 {
n += 1 + sovRpc(uint64(m.LeaseId))
if m.ID != 0 {
n += 1 + sovRpc(uint64(m.ID))
}
return n
}
@ -2306,8 +2300,8 @@ func (m *LeaseRevokeResponse) Size() (n int) {
func (m *LeaseKeepAliveRequest) Size() (n int) {
var l int
_ = l
if m.LeaseId != 0 {
n += 1 + sovRpc(uint64(m.LeaseId))
if m.ID != 0 {
n += 1 + sovRpc(uint64(m.ID))
}
return n
}
@ -2319,11 +2313,8 @@ func (m *LeaseKeepAliveResponse) Size() (n int) {
l = m.Header.Size()
n += 1 + l + sovRpc(uint64(l))
}
if m.LeaseId != 0 {
n += 1 + sovRpc(uint64(m.LeaseId))
}
if m.Ttl != 0 {
n += 1 + sovRpc(uint64(m.Ttl))
if m.TTL != 0 {
n += 1 + sovRpc(uint64(m.TTL))
}
return n
}
@ -4351,16 +4342,16 @@ func (m *LeaseCreateRequest) Unmarshal(data []byte) error {
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType)
}
m.Ttl = 0
m.TTL = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.Ttl |= (int64(b) & 0x7F) << shift
m.TTL |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
@ -4442,32 +4433,32 @@ func (m *LeaseCreateResponse) Unmarshal(data []byte) error {
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType)
}
m.LeaseId = 0
m.ID = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.LeaseId |= (int64(b) & 0x7F) << shift
m.ID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType)
}
m.Ttl = 0
m.TTL = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.Ttl |= (int64(b) & 0x7F) << shift
m.TTL |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
@ -4545,16 +4536,16 @@ func (m *LeaseRevokeRequest) Unmarshal(data []byte) error {
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType)
}
m.LeaseId = 0
m.ID = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.LeaseId |= (int64(b) & 0x7F) << shift
m.ID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
@ -4681,16 +4672,16 @@ func (m *LeaseKeepAliveRequest) Unmarshal(data []byte) error {
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType)
}
m.LeaseId = 0
m.ID = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.LeaseId |= (int64(b) & 0x7F) << shift
m.ID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
@ -4772,32 +4763,16 @@ func (m *LeaseKeepAliveResponse) Unmarshal(data []byte) error {
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType)
}
m.LeaseId = 0
m.TTL = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.LeaseId |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType)
}
m.Ttl = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.Ttl |= (int64(b) & 0x7F) << shift
m.TTL |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}

View File

@ -243,19 +243,19 @@ message WatchResponse {
message LeaseCreateRequest {
// advisory ttl in seconds
int64 ttl = 1;
int64 TTL = 1;
}
message LeaseCreateResponse {
ResponseHeader header = 1;
int64 lease_id = 2;
int64 ID = 2;
// server decided ttl in second
int64 ttl = 3;
int64 TTL = 3;
string error = 4;
}
message LeaseRevokeRequest {
int64 lease_id = 1;
int64 ID = 1;
}
message LeaseRevokeResponse {
@ -263,11 +263,10 @@ message LeaseRevokeResponse {
}
message LeaseKeepAliveRequest {
int64 lease_id = 1;
int64 ID = 1;
}
message LeaseKeepAliveResponse {
ResponseHeader header = 1;
int64 lease_id = 2;
int64 ttl = 3;
int64 TTL = 2;
}

View File

@ -34,6 +34,7 @@ import (
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/idutil"
"github.com/coreos/etcd/pkg/pbutil"
@ -166,8 +167,9 @@ type EtcdServer struct {
store store.Store
kv dstorage.ConsistentWatchableKV
be backend.Backend
kv dstorage.ConsistentWatchableKV
lessor lease.Lessor
be backend.Backend
stats *stats.ServerStats
lstats *stats.LeaderStats
@ -360,6 +362,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if cfg.V3demo {
srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
srv.kv = dstorage.New(srv.be, &srv.consistIndex)
srv.lessor = lease.NewLessor(uint8(id), srv.be, srv.kv)
}
// TODO: move transport initialization near the definition of remote
@ -589,6 +592,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
plog.Panicf("rename snapshot file error: %v", err)
}
// TODO: recover leassor
newbe := backend.NewDefaultBackend(fn)
if err := s.kv.Restore(newbe); err != nil {
plog.Panicf("restore KV error: %v", err)

View File

@ -34,6 +34,11 @@ type RaftKV interface {
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
}
type Lessor interface {
LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error)
LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
}
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
if err != nil {
@ -74,6 +79,22 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.
return result.resp.(*pb.CompactionResponse), result.err
}
func (s *EtcdServer) LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{LeaseCreate: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.LeaseCreateResponse), result.err
}
func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.LeaseRevokeResponse), result.err
}
type applyResult struct {
resp proto.Message
err error
@ -115,6 +136,7 @@ const (
func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} {
kv := s.getKV()
le := s.lessor
ar := &applyResult{}
@ -129,6 +151,10 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} {
ar.resp, ar.err = applyTxn(kv, r.Txn)
case r.Compaction != nil:
ar.resp, ar.err = applyCompaction(kv, r.Compaction)
case r.LeaseCreate != nil:
ar.resp, ar.err = applyLeaseCreate(le, r.LeaseCreate)
case r.LeaseRevoke != nil:
ar.resp, ar.err = applyLeaseRevoke(le, r.LeaseRevoke)
default:
panic("not implemented")
}
@ -348,6 +374,18 @@ func applyCompare(txnID int64, kv dstorage.KV, c *pb.Compare) (int64, bool) {
return rev, true
}
func applyLeaseCreate(le lease.Lessor, lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
l := le.Grant(lc.TTL)
return &pb.LeaseCreateResponse{ID: int64(l.ID), TTL: l.TTL}, nil
}
func applyLeaseRevoke(le lease.Lessor, lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
err := le.Revoke(lease.LeaseID(lc.ID))
return &pb.LeaseRevokeResponse{}, err
}
func compareInt64(a, b int64) int {
switch {
case a < b:

View File

@ -48,8 +48,17 @@ type DeleteableRange interface {
DeleteRange(key, end []byte) (int64, int64)
}
// a lessor is the owner of leases. It can grant, revoke,
// renew and modify leases for lessee.
// A Lessor is the owner of leases. It can grant, revoke, renew and modify leases for lessee.
type Lessor interface {
// Grant grants a lease that expires at least after TTL seconds.
Grant(ttl int64) *Lease
// Revoke revokes a lease with given ID. The item attached to the
// given lease will be removed. If the ID does not exist, an error
// will be returned.
Revoke(id LeaseID) error
}
// lessor implements Lessor interface.
// TODO: use clockwork for testability.
type lessor struct {
mu sync.Mutex
@ -59,7 +68,7 @@ type lessor struct {
// We want to make Grant, Revoke, and FindExpired all O(logN) and
// Renew O(1).
// FindExpired and Renew should be the most frequent operations.
leaseMap map[LeaseID]*lease
leaseMap map[LeaseID]*Lease
// A DeleteableRange the lessor operates on.
// When a lease expires, the lessor will delete the
@ -73,9 +82,19 @@ type lessor struct {
idgen *idutil.Generator
}
func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) Lessor {
return newLessor(lessorID, b, dr)
}
func newLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
// ensure the most significant bit of lessorID is 0.
// so all the IDs generated by id generator will be greater than 0.
if int8(lessorID) < 0 {
lessorID = uint8(-int8(lessorID))
}
l := &lessor{
leaseMap: make(map[LeaseID]*lease),
leaseMap: make(map[LeaseID]*Lease),
b: b,
dr: dr,
idgen: idutil.NewGenerator(lessorID, time.Now()),
@ -85,10 +104,9 @@ func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
return l
}
// Grant grants a lease that expires at least after TTL seconds.
// TODO: when lessor is under high load, it should give out lease
// with longer TTL to reduce renew load.
func (le *lessor) Grant(ttl int64) *lease {
func (le *lessor) Grant(ttl int64) *Lease {
// TODO: define max TTL
expiry := time.Now().Add(time.Duration(ttl) * time.Second)
expiry = minExpiry(time.Now(), expiry)
@ -98,7 +116,7 @@ func (le *lessor) Grant(ttl int64) *lease {
le.mu.Lock()
defer le.mu.Unlock()
l := &lease{id: id, ttl: ttl, expiry: expiry, itemSet: make(map[leaseItem]struct{})}
l := &Lease{ID: id, TTL: ttl, expiry: expiry, itemSet: make(map[leaseItem]struct{})}
if _, ok := le.leaseMap[id]; ok {
panic("lease: unexpected duplicate ID!")
}
@ -109,9 +127,6 @@ func (le *lessor) Grant(ttl int64) *lease {
return l
}
// Revoke revokes a lease with given ID. The item attached to the
// given lease will be removed. If the ID does not exist, an error
// will be returned.
func (le *lessor) Revoke(id LeaseID) error {
le.mu.Lock()
defer le.mu.Unlock()
@ -125,7 +140,7 @@ func (le *lessor) Revoke(id LeaseID) error {
le.dr.DeleteRange([]byte(item.key), nil)
}
delete(le.leaseMap, l.id)
delete(le.leaseMap, l.ID)
l.removeFrom(le.b)
return nil
@ -143,7 +158,7 @@ func (le *lessor) Renew(id LeaseID) error {
return fmt.Errorf("lease: cannot find lease %x", id)
}
expiry := time.Now().Add(time.Duration(l.ttl) * time.Second)
expiry := time.Now().Add(time.Duration(l.TTL) * time.Second)
l.expiry = minExpiry(time.Now(), expiry)
return nil
}
@ -166,13 +181,24 @@ func (le *lessor) Attach(id LeaseID, items []leaseItem) error {
return nil
}
// findExpiredLeases loops all the leases in the leaseMap and returns the expired
// leases that needed to be revoked.
func (le *lessor) findExpiredLeases() []*lease {
func (le *lessor) Recover(b backend.Backend, dr DeleteableRange) {
le.mu.Lock()
defer le.mu.Unlock()
leases := make([]*lease, 0, 16)
le.b = b
le.dr = dr
le.leaseMap = make(map[LeaseID]*Lease)
le.initAndRecover()
}
// findExpiredLeases loops all the leases in the leaseMap and returns the expired
// leases that needed to be revoked.
func (le *lessor) findExpiredLeases() []*Lease {
le.mu.Lock()
defer le.mu.Unlock()
leases := make([]*Lease, 0, 16)
now := time.Now()
for _, l := range le.leaseMap {
@ -186,7 +212,7 @@ func (le *lessor) findExpiredLeases() []*lease {
// get gets the lease with given id.
// get is a helper fucntion for testing, at least for now.
func (le *lessor) get(id LeaseID) *lease {
func (le *lessor) get(id LeaseID) *Lease {
le.mu.Lock()
defer le.mu.Unlock()
@ -207,10 +233,10 @@ func (le *lessor) initAndRecover() {
tx.Unlock()
panic("failed to unmarshal lease proto item")
}
id := LeaseID(lpb.ID)
le.leaseMap[id] = &lease{
id: id,
ttl: lpb.TTL,
ID := LeaseID(lpb.ID)
le.leaseMap[ID] = &Lease{
ID: ID,
TTL: lpb.TTL,
// itemSet will be filled in when recover key-value pairs
expiry: minExpiry(time.Now(), time.Now().Add(time.Second*time.Duration(lpb.TTL))),
@ -221,19 +247,19 @@ func (le *lessor) initAndRecover() {
le.b.ForceCommit()
}
type lease struct {
id LeaseID
ttl int64 // time to live in seconds
type Lease struct {
ID LeaseID
TTL int64 // time to live in seconds
itemSet map[leaseItem]struct{}
// expiry time in unixnano
expiry time.Time
}
func (l lease) persistTo(b backend.Backend) {
key := int64ToBytes(int64(l.id))
func (l Lease) persistTo(b backend.Backend) {
key := int64ToBytes(int64(l.ID))
lpb := leasepb.Lease{ID: int64(l.id), TTL: int64(l.ttl)}
lpb := leasepb.Lease{ID: int64(l.ID), TTL: int64(l.TTL)}
val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
@ -244,8 +270,8 @@ func (l lease) persistTo(b backend.Backend) {
b.BatchTx().Unlock()
}
func (l lease) removeFrom(b backend.Backend) {
key := int64ToBytes(int64(l.id))
func (l Lease) removeFrom(b backend.Backend) {
key := int64ToBytes(int64(l.ID))
b.BatchTx().Lock()
b.BatchTx().UnsafeDelete(leaseBucketName, key)

View File

@ -33,10 +33,10 @@ func TestLessorGrant(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := NewLessor(1, be, &fakeDeleteable{})
le := newLessor(1, be, &fakeDeleteable{})
l := le.Grant(1)
gl := le.get(l.id)
gl := le.get(l.ID)
if !reflect.DeepEqual(gl, l) {
t.Errorf("lease = %v, want %v", gl, l)
@ -46,12 +46,12 @@ func TestLessorGrant(t *testing.T) {
}
nl := le.Grant(1)
if nl.id == l.id {
t.Errorf("new lease.id = %x, want != %x", nl.id, l.id)
if nl.ID == l.ID {
t.Errorf("new lease.id = %x, want != %x", nl.ID, l.ID)
}
be.BatchTx().Lock()
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.id)), nil, 0)
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.ID)), nil, 0)
if len(vs) != 1 {
t.Errorf("len(vs) = %d, want 1", len(vs))
}
@ -69,7 +69,7 @@ func TestLessorRevoke(t *testing.T) {
fd := &fakeDeleteable{}
le := NewLessor(1, be, fd)
le := newLessor(1, be, fd)
// grant a lease with long term (100 seconds) to
// avoid early termination during the test.
@ -80,18 +80,18 @@ func TestLessorRevoke(t *testing.T) {
{"bar"},
}
err := le.Attach(l.id, items)
err := le.Attach(l.ID, items)
if err != nil {
t.Fatalf("failed to attach items to the lease: %v", err)
}
err = le.Revoke(l.id)
err = le.Revoke(l.ID)
if err != nil {
t.Fatal("failed to revoke lease:", err)
}
if le.get(l.id) != nil {
t.Errorf("got revoked lease %x", l.id)
if le.get(l.ID) != nil {
t.Errorf("got revoked lease %x", l.ID)
}
wdeleted := []string{"foo_", "bar_"}
@ -100,7 +100,7 @@ func TestLessorRevoke(t *testing.T) {
}
be.BatchTx().Lock()
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.id)), nil, 0)
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.ID)), nil, 0)
if len(vs) != 0 {
t.Errorf("len(vs) = %d, want 0", len(vs))
}
@ -113,14 +113,14 @@ func TestLessorRenew(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)
le := NewLessor(1, be, &fakeDeleteable{})
le := newLessor(1, be, &fakeDeleteable{})
l := le.Grant(5)
// manually change the ttl field
l.ttl = 10
l.TTL = 10
le.Renew(l.id)
l = le.get(l.id)
le.Renew(l.ID)
l = le.get(l.ID)
if l.expiry.Sub(time.Now()) < 9*time.Second {
t.Errorf("failed to renew the lease")
@ -134,20 +134,20 @@ func TestLessorRecover(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := NewLessor(1, be, &fakeDeleteable{})
le := newLessor(1, be, &fakeDeleteable{})
l1 := le.Grant(10)
l2 := le.Grant(20)
// Create a new lessor with the same backend
nle := NewLessor(1, be, &fakeDeleteable{})
nl1 := nle.get(l1.id)
if nl1 == nil || nl1.ttl != l1.ttl {
t.Errorf("nl1 = %v, want nl1.TTL= %d", l1.ttl)
nle := newLessor(1, be, &fakeDeleteable{})
nl1 := nle.get(l1.ID)
if nl1 == nil || nl1.TTL != l1.TTL {
t.Errorf("nl1 = %v, want nl1.TTL= %d", nl1.TTL, l1.TTL)
}
nl2 := nle.get(l2.id)
if nl2 == nil || nl2.ttl != l2.ttl {
t.Errorf("nl2 = %v, want nl2.TTL= %d", l2.ttl)
nl2 := nle.get(l2.ID)
if nl2 == nil || nl2.TTL != l2.TTL {
t.Errorf("nl2 = %v, want nl2.TTL= %d", nl2.TTL, l2.TTL)
}
}