Merge pull request #8358 from gyuho/lease-list

api: lease list
This commit is contained in:
Gyu-Ho Lee 2017-08-14 14:32:03 -07:00 committed by GitHub
commit deb0098d33
21 changed files with 1169 additions and 340 deletions

View File

@ -58,6 +58,7 @@ This is a generated documentation. Please read the proto files for more.
| LeaseRevoke | LeaseRevokeRequest | LeaseRevokeResponse | LeaseRevoke revokes a lease. All keys attached to the lease will expire and be deleted. |
| LeaseKeepAlive | LeaseKeepAliveRequest | LeaseKeepAliveResponse | LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client to the server and streaming keep alive responses from the server to the client. |
| LeaseTimeToLive | LeaseTimeToLiveRequest | LeaseTimeToLiveResponse | LeaseTimeToLive retrieves lease information. |
| LeaseLeases | LeaseLeasesRequest | LeaseLeasesResponse | LeaseLeases lists all existing leases. |
@ -513,6 +514,21 @@ Empty field.
##### message `LeaseLeasesRequest` (etcdserver/etcdserverpb/rpc.proto)
Empty field.
##### message `LeaseLeasesResponse` (etcdserver/etcdserverpb/rpc.proto)
| Field | Description | Type |
| ----- | ----------- | ---- |
| header | | ResponseHeader |
| leases | | (slice of) LeaseStatus |
##### message `LeaseRevokeRequest` (etcdserver/etcdserverpb/rpc.proto)
| Field | Description | Type |
@ -529,6 +545,14 @@ Empty field.
##### message `LeaseStatus` (etcdserver/etcdserverpb/rpc.proto)
| Field | Description | Type |
| ----- | ----------- | ---- |
| ID | | int64 |
##### message `LeaseTimeToLiveRequest` (etcdserver/etcdserverpb/rpc.proto)
| Field | Description | Type |

View File

@ -609,6 +609,33 @@
}
}
},
"/v3alpha/kv/lease/leases": {
"post": {
"tags": [
"Lease"
],
"summary": "LeaseLeases lists all existing leases.",
"operationId": "LeaseLeases",
"parameters": [
{
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/etcdserverpbLeaseLeasesRequest"
}
}
],
"responses": {
"200": {
"description": "(empty)",
"schema": {
"$ref": "#/definitions/etcdserverpbLeaseLeasesResponse"
}
}
}
}
},
"/v3alpha/kv/lease/revoke": {
"post": {
"tags": [
@ -1666,6 +1693,23 @@
}
}
},
"etcdserverpbLeaseLeasesRequest": {
"type": "object"
},
"etcdserverpbLeaseLeasesResponse": {
"type": "object",
"properties": {
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
},
"leases": {
"type": "array",
"items": {
"$ref": "#/definitions/etcdserverpbLeaseStatus"
}
}
}
},
"etcdserverpbLeaseRevokeRequest": {
"type": "object",
"properties": {
@ -1684,6 +1728,15 @@
}
}
},
"etcdserverpbLeaseStatus": {
"type": "object",
"properties": {
"ID": {
"type": "string",
"format": "int64"
}
}
},
"etcdserverpbLeaseTimeToLiveRequest": {
"type": "object",
"properties": {

View File

@ -574,6 +574,37 @@ func TestLeaseTimeToLiveLeaseNotFound(t *testing.T) {
}
}
func TestLeaseLeases(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()
ids := []clientv3.LeaseID{}
for i := 0; i < 5; i++ {
resp, err := cli.Grant(context.Background(), 10)
if err != nil {
t.Errorf("failed to create lease %v", err)
}
ids = append(ids, resp.ID)
}
resp, err := cli.Leases(context.Background())
if err != nil {
t.Fatal(err)
}
if len(resp.Leases) != 5 {
t.Fatalf("len(resp.Leases) expected 5, got %d", len(resp.Leases))
}
for i := range resp.Leases {
if ids[i] != resp.Leases[i].ID {
t.Fatalf("#%d: lease ID expected %d, got %d", i, ids[i], resp.Leases[i].ID)
}
}
}
// TestLeaseRenewLostQuorum ensures keepalives work after losing quorum
// for a while.
func TestLeaseRenewLostQuorum(t *testing.T) {

View File

@ -60,6 +60,18 @@ type LeaseTimeToLiveResponse struct {
Keys [][]byte `json:"keys"`
}
// LeaseStatus represents a lease status.
type LeaseStatus struct {
ID LeaseID `json:"id"`
// TODO: TTL int64
}
// LeaseLeasesResponse is used to convert the protobuf lease list response.
type LeaseLeasesResponse struct {
*pb.ResponseHeader
Leases []LeaseStatus `json:"leases"`
}
const (
// defaultTTL is the assumed lease TTL used for the first keepalive
// deadline before the actual TTL is known to the client.
@ -98,6 +110,9 @@ type Lease interface {
// TimeToLive retrieves the lease information of the given lease ID.
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// Leases retrieves all leases.
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
// KeepAlive keeps the given lease alive forever.
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
@ -219,6 +234,22 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
}
}
func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
for {
resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, grpc.FailFast(false))
if err == nil {
leases := make([]LeaseStatus, len(resp.Leases))
for i := range resp.Leases {
leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)}
}
return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
}
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err)
}
}
}
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)

View File

@ -22,6 +22,7 @@ import (
)
func TestCtlV3LeaseGrantTimeToLive(t *testing.T) { testCtl(t, leaseTestGrantTimeToLive) }
func TestCtlV3LeaseGrantLeases(t *testing.T) { testCtl(t, leaseTestGrantLeasesList) }
func TestCtlV3LeaseKeepAlive(t *testing.T) { testCtl(t, leaseTestKeepAlive) }
func TestCtlV3LeaseRevoke(t *testing.T) { testCtl(t, leaseTestRevoke) }
@ -51,6 +52,26 @@ func leaseTestGrantTimeToLive(cx ctlCtx) {
}
}
func leaseTestGrantLeasesList(cx ctlCtx) {
id, err := ctlV3LeaseGrant(cx, 10)
if err != nil {
cx.t.Fatal(err)
}
cmdArgs := append(cx.PrefixArgs(), "lease", "list")
proc, err := spawnCmd(cmdArgs)
if err != nil {
cx.t.Fatal(err)
}
_, err = proc.Expect(id)
if err != nil {
cx.t.Fatal(err)
}
if err = proc.Close(); err != nil {
cx.t.Fatal(err)
}
}
func leaseTestKeepAlive(cx ctlCtx) {
// put with TTL 10 seconds and keep-alive
leaseID, err := ctlV3LeaseGrant(cx, 10)

View File

@ -439,6 +439,26 @@ Prints lease information.
# {"cluster_id":17186838941855831277,"member_id":4845372305070271874,"revision":3,"raft_term":2,"id":3279279168933706764,"ttl":459,"granted-ttl":500,"keys":["Zm9vMQ==","Zm9vMg=="]}
```
### LEASE LIST
LEASE LIST lists all active leases.
RPC: LeaseLeases
#### Output
Prints a message with a list of active leases.
#### Example
```bash
./etcdctl lease grant 10
# lease 32695410dcc0ca06 granted with TTL(10s)
./etcdctl lease list
32695410dcc0ca06
```
### LEASE KEEP-ALIVE \<leaseID\>
LEASE KEEP-ALIVE periodically refreshes a lease so it does not expire.

View File

@ -33,6 +33,7 @@ func NewLeaseCommand() *cobra.Command {
lc.AddCommand(NewLeaseGrantCommand())
lc.AddCommand(NewLeaseRevokeCommand())
lc.AddCommand(NewLeaseTimeToLiveCommand())
lc.AddCommand(NewLeaseListCommand())
lc.AddCommand(NewLeaseKeepAliveCommand())
return lc
@ -129,6 +130,25 @@ func leaseTimeToLiveCommandFunc(cmd *cobra.Command, args []string) {
display.TimeToLive(*resp, timeToLiveKeys)
}
// NewLeaseListCommand returns the cobra command for "lease list".
func NewLeaseListCommand() *cobra.Command {
lc := &cobra.Command{
Use: "list",
Short: "List all active leases",
Run: leaseListCommandFunc,
}
return lc
}
// leaseListCommandFunc executes the "lease list" command.
func leaseListCommandFunc(cmd *cobra.Command, args []string) {
resp, rerr := mustClientFromCmd(cmd).Leases(context.TODO())
if rerr != nil {
ExitWithError(ExitBadConnection, rerr)
}
display.Leases(*resp)
}
// NewLeaseKeepAliveCommand returns the cobra command for "lease keep-alive".
func NewLeaseKeepAliveCommand() *cobra.Command {
lc := &cobra.Command{

View File

@ -36,6 +36,7 @@ type printer interface {
Revoke(id v3.LeaseID, r v3.LeaseRevokeResponse)
KeepAlive(r v3.LeaseKeepAliveResponse)
TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool)
Leases(r v3.LeaseLeasesResponse)
MemberAdd(v3.MemberAddResponse)
MemberRemove(id uint64, r v3.MemberRemoveResponse)
@ -96,6 +97,7 @@ func (p *printerRPC) Grant(r v3.LeaseGrantResponse) { p.p(r
func (p *printerRPC) Revoke(id v3.LeaseID, r v3.LeaseRevokeResponse) { p.p(r) }
func (p *printerRPC) KeepAlive(r v3.LeaseKeepAliveResponse) { p.p(r) }
func (p *printerRPC) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) { p.p(&r) }
func (p *printerRPC) Leases(r v3.LeaseLeasesResponse) { p.p(&r) }
func (p *printerRPC) MemberAdd(r v3.MemberAddResponse) { p.p((*pb.MemberAddResponse)(&r)) }
func (p *printerRPC) MemberRemove(id uint64, r v3.MemberRemoveResponse) {

View File

@ -118,6 +118,13 @@ func (p *fieldsPrinter) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) {
}
}
func (p *fieldsPrinter) Leases(r v3.LeaseLeasesResponse) {
p.hdr(r.ResponseHeader)
for _, item := range r.Leases {
fmt.Println(`"ID" :`, item.ID)
}
}
func (p *fieldsPrinter) MemberList(r v3.MemberListResponse) {
p.hdr(r.Header)
for _, m := range r.Members {

View File

@ -104,6 +104,13 @@ func (s *simplePrinter) TimeToLive(resp v3.LeaseTimeToLiveResponse, keys bool) {
fmt.Println(txt)
}
func (s *simplePrinter) Leases(resp v3.LeaseLeasesResponse) {
fmt.Printf("found %d leases\n", len(resp.Leases))
for _, item := range resp.Leases {
fmt.Printf("%016x\n", item.ID)
}
}
func (s *simplePrinter) Alarm(resp v3.AlarmResponse) {
for _, e := range resp.Alarms {
fmt.Printf("%+v\n", e)

View File

@ -68,6 +68,21 @@ func (ls *LeaseServer) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLi
return resp, nil
}
func (ls *LeaseServer) LeaseLeases(ctx context.Context, rr *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
resp, err := ls.le.LeaseLeases(ctx, rr)
if err != nil && err != lease.ErrLeaseNotFound {
return nil, togRPCError(err)
}
if err == lease.ErrLeaseNotFound {
resp = &pb.LeaseLeasesResponse{
Header: &pb.ResponseHeader{},
Leases: []*pb.LeaseStatus{},
}
}
ls.hdr.fill(resp.Header)
return resp, nil
}
func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) (err error) {
errc := make(chan error, 1)
go func() {

View File

@ -49,6 +49,9 @@
LeaseKeepAliveResponse
LeaseTimeToLiveRequest
LeaseTimeToLiveResponse
LeaseLeasesRequest
LeaseStatus
LeaseLeasesResponse
Member
MemberAddRequest
MemberAddResponse

View File

@ -236,6 +236,19 @@ func request_Lease_LeaseTimeToLive_0(ctx context.Context, marshaler runtime.Mars
}
func request_Lease_LeaseLeases_0(ctx context.Context, marshaler runtime.Marshaler, client etcdserverpb.LeaseClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq etcdserverpb.LeaseLeasesRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil {
return nil, metadata, grpc.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.LeaseLeases(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func request_Cluster_MemberAdd_0(ctx context.Context, marshaler runtime.Marshaler, client etcdserverpb.ClusterClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq etcdserverpb.MemberAddRequest
var metadata runtime.ServerMetadata
@ -1003,6 +1016,34 @@ func RegisterLeaseHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc
})
mux.Handle("POST", pattern_Lease_LeaseLeases_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, req)
if err != nil {
runtime.HTTPError(ctx, outboundMarshaler, w, req, err)
}
resp, md, err := request_Lease_LeaseLeases_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, outboundMarshaler, w, req, err)
return
}
forward_Lease_LeaseLeases_0(ctx, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
@ -1014,6 +1055,8 @@ var (
pattern_Lease_LeaseKeepAlive_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v3alpha", "lease", "keepalive"}, ""))
pattern_Lease_LeaseTimeToLive_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"v3alpha", "kv", "lease", "timetolive"}, ""))
pattern_Lease_LeaseLeases_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"v3alpha", "kv", "lease", "leases"}, ""))
)
var (
@ -1024,6 +1067,8 @@ var (
forward_Lease_LeaseKeepAlive_0 = runtime.ForwardResponseStream
forward_Lease_LeaseTimeToLive_0 = runtime.ForwardResponseMessage
forward_Lease_LeaseLeases_0 = runtime.ForwardResponseMessage
)
// RegisterClusterHandlerFromEndpoint is same as RegisterClusterHandler but

File diff suppressed because it is too large Load Diff

View File

@ -112,7 +112,13 @@ service Lease {
};
}
// TODO(xiangli) List all existing Leases?
// LeaseLeases lists all existing leases.
rpc LeaseLeases(LeaseLeasesRequest) returns (LeaseLeasesResponse) {
option (google.api.http) = {
post: "/v3alpha/kv/lease/leases"
body: "*"
};
}
}
service Cluster {
@ -757,6 +763,19 @@ message LeaseTimeToLiveResponse {
repeated bytes keys = 5;
}
message LeaseLeasesRequest {
}
message LeaseStatus {
int64 ID = 1;
// TODO: int64 TTL = 2;
}
message LeaseLeasesResponse {
ResponseHeader header = 1;
repeated LeaseStatus leases = 2;
}
message Member {
// ID is the member ID for this member.
uint64 ID = 1;

View File

@ -60,6 +60,9 @@ type Lessor interface {
// LeaseTimeToLive retrieves lease information.
LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
// LeaseLeases lists all leases.
LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error)
}
type Authenticator interface {
@ -291,6 +294,15 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
return nil, ErrTimeout
}
func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
ls := s.lessor.Leases()
lss := make([]*pb.LeaseStatus, len(ls))
for i := range ls {
lss[i] = &pb.LeaseStatus{ID: int64(ls[i].ID)}
}
return &pb.LeaseLeasesResponse{Header: newHeader(s), Leases: lss}, nil
}
func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) {
leader := s.cluster.Member(s.Leader())
for leader == nil {

View File

@ -234,6 +234,43 @@ func TestV3LeaseExists(t *testing.T) {
}
}
// TestV3LeaseLeases creates leases and confirms list RPC fetches created ones.
func TestV3LeaseLeases(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
ctx0, cancel0 := context.WithCancel(context.Background())
defer cancel0()
// create leases
ids := []int64{}
for i := 0; i < 5; i++ {
lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(
ctx0,
&pb.LeaseGrantRequest{TTL: 30})
if err != nil {
t.Fatal(err)
}
if lresp.Error != "" {
t.Fatal(lresp.Error)
}
ids = append(ids, lresp.ID)
}
lresp, err := toGRPC(clus.RandClient()).Lease.LeaseLeases(
context.Background(),
&pb.LeaseLeasesRequest{})
if err != nil {
t.Fatal(err)
}
for i := range lresp.Leases {
if lresp.Leases[i].ID != ids[i] {
t.Fatalf("#%d: lease ID expected %d, got %d", i, ids[i], lresp.Leases[i].ID)
}
}
}
// TestV3LeaseRenewStress keeps creating lease and renewing it immediately to ensure the renewal goes through.
// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
// related issue https://github.com/coreos/etcd/issues/6978

View File

@ -99,6 +99,9 @@ type Lessor interface {
// Lookup gives the lease at a given lease id, if any
Lookup(id LeaseID) *Lease
// Leases lists all leases.
Leases() []*Lease
// ExpiredLeasesC returns a chan that is used to receive expired leases.
ExpiredLeasesC() <-chan []*Lease
@ -317,6 +320,22 @@ func (le *lessor) Lookup(id LeaseID) *Lease {
return le.leaseMap[id]
}
func (le *lessor) unsafeLeases() []*Lease {
leases := make([]*Lease, 0, len(le.leaseMap))
for _, l := range le.leaseMap {
leases = append(leases, l)
}
sort.Sort(leasesByExpiry(leases))
return leases
}
func (le *lessor) Leases() []*Lease {
le.mu.Lock()
ls := le.unsafeLeases()
le.mu.Unlock()
return ls
}
func (le *lessor) Promote(extend time.Duration) {
le.mu.Lock()
defer le.mu.Unlock()
@ -334,11 +353,7 @@ func (le *lessor) Promote(extend time.Duration) {
}
// adjust expiries in case of overlap
leases := make([]*Lease, 0, len(le.leaseMap))
for _, l := range le.leaseMap {
leases = append(leases, l)
}
sort.Sort(leasesByExpiry(leases))
leases := le.unsafeLeases()
baseWindow := leases[0].Remaining()
nextWindow := baseWindow + time.Second
@ -636,6 +651,8 @@ func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil }
func (le *FakeLessor) Lookup(id LeaseID) *Lease { return nil }
func (le *FakeLessor) Leases() []*Lease { return nil }
func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil }
func (fl *FakeLessor) Recover(b backend.Backend, rd RangeDeleter) {}

View File

@ -72,6 +72,17 @@ func TestLessorGrant(t *testing.T) {
t.Errorf("new lease.id = %x, want != %x", nl.ID, l.ID)
}
lss := []*Lease{gl, nl}
leases := le.Leases()
for i := range lss {
if lss[i].ID != leases[i].ID {
t.Fatalf("lease ID expected %d, got %d", lss[i].ID, leases[i].ID)
}
if lss[i].ttl != leases[i].ttl {
t.Fatalf("ttl expected %d, got %d", lss[i].ttl, leases[i].ttl)
}
}
be.BatchTx().Lock()
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.ID)), nil, 0)
if len(vs) != 1 {

View File

@ -48,6 +48,10 @@ func (c *ls2lc) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveReque
return c.leaseServer.LeaseTimeToLive(ctx, in)
}
func (c *ls2lc) LeaseLeases(ctx context.Context, in *pb.LeaseLeasesRequest, opts ...grpc.CallOption) (*pb.LeaseLeasesResponse, error) {
return c.leaseServer.LeaseLeases(ctx, in)
}
// ls2lcClientStream implements Lease_LeaseKeepAliveClient
type ls2lcClientStream struct{ chanClientStream }

View File

@ -113,6 +113,22 @@ func (lp *leaseProxy) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiv
return rp, err
}
func (lp *leaseProxy) LeaseLeases(ctx context.Context, rr *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
r, err := lp.lessor.Leases(ctx)
if err != nil {
return nil, err
}
leases := make([]*pb.LeaseStatus, len(r.Leases))
for i := range r.Leases {
leases[i] = &pb.LeaseStatus{ID: int64(r.Leases[i].ID)}
}
rp := &pb.LeaseLeasesResponse{
Header: r.ResponseHeader,
Leases: leases,
}
return rp, err
}
func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
lp.mu.Lock()
select {