clientv3: Use LeaseID in all the client APIs.

In order to use LeaseID type instead of int64 we have to convert
the protobuf lease responses into client lease reponses.
This commit is contained in:
Ajit Yagaty 2016-04-14 17:37:25 -07:00
parent 74153ffa45
commit 06a4086bf9
2 changed files with 48 additions and 19 deletions

View File

@ -39,7 +39,7 @@ func ExampleLease_grant() {
}
// after 5 seconds, the key 'foo' will be removed
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(clientv3.LeaseID(resp.ID)))
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
@ -60,13 +60,13 @@ func ExampleLease_revoke() {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(clientv3.LeaseID(resp.ID)))
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
// revoking lease expires the key attached to its lease ID
_, err = cli.Revoke(context.TODO(), clientv3.LeaseID(resp.ID))
_, err = cli.Revoke(context.TODO(), resp.ID)
if err != nil {
log.Fatal(err)
}
@ -94,13 +94,13 @@ func ExampleLease_keepAlive() {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(clientv3.LeaseID(resp.ID)))
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
// the key 'foo' will be kept forever
_, err = cli.KeepAlive(context.TODO(), clientv3.LeaseID(resp.ID))
_, err = cli.KeepAlive(context.TODO(), resp.ID)
if err != nil {
log.Fatal(err)
}
@ -121,13 +121,13 @@ func ExampleLease_keepAliveOnce() {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(clientv3.LeaseID(resp.ID)))
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
// to renew the lease only once
_, err = cli.KeepAliveOnce(context.TODO(), clientv3.LeaseID(resp.ID))
_, err = cli.KeepAliveOnce(context.TODO(), resp.ID)
if err != nil {
log.Fatal(err)
}

View File

@ -24,12 +24,25 @@ import (
)
type (
LeaseGrantResponse pb.LeaseGrantResponse
LeaseRevokeResponse pb.LeaseRevokeResponse
LeaseKeepAliveResponse pb.LeaseKeepAliveResponse
LeaseID int64
LeaseRevokeResponse pb.LeaseRevokeResponse
LeaseID int64
)
// LeaseGrantResponse is used to convert the protobuf grant response.
type LeaseGrantResponse struct {
*pb.ResponseHeader
ID LeaseID
TTL int64
Error string
}
// LeaseKeepAliveResponse is used to convert the protobuf keepalive response.
type LeaseKeepAliveResponse struct {
*pb.ResponseHeader
ID LeaseID
TTL int64
}
const (
// a small buffer to store unsent lease responses.
leaseResponseChSize = 16
@ -112,7 +125,13 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
r := &pb.LeaseGrantRequest{TTL: ttl}
resp, err := l.getRemote().LeaseGrant(cctx, r)
if err == nil {
return (*LeaseGrantResponse)(resp), nil
gresp := &LeaseGrantResponse{
ResponseHeader: resp.GetHeader(),
ID: LeaseID(resp.ID),
TTL: resp.TTL,
Error: resp.Error,
}
return gresp, nil
}
if isHalted(cctx, err) {
return nil, err
@ -245,7 +264,13 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
if rerr != nil {
return nil, rerr
}
return (*LeaseKeepAliveResponse)(resp), nil
karesp := &LeaseKeepAliveResponse{
ResponseHeader: resp.GetHeader(),
ID: LeaseID(resp.ID),
TTL: resp.TTL,
}
return karesp, nil
}
func (l *lessor) recvKeepAliveLoop() {
@ -286,28 +311,32 @@ func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
id := LeaseID(resp.ID)
karesp := &LeaseKeepAliveResponse{
ResponseHeader: resp.GetHeader(),
ID: LeaseID(resp.ID),
TTL: resp.TTL,
}
l.mu.Lock()
defer l.mu.Unlock()
ka, ok := l.keepAlives[id]
ka, ok := l.keepAlives[karesp.ID]
if !ok {
return
}
if resp.TTL <= 0 {
if karesp.TTL <= 0 {
// lease expired; close all keep alive channels
delete(l.keepAlives, id)
delete(l.keepAlives, karesp.ID)
ka.Close()
return
}
// send update to all channels
nextDeadline := time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second)
nextDeadline := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second)
for _, ch := range ka.chs {
select {
case ch <- (*LeaseKeepAliveResponse)(resp):
case ch <- karesp:
ka.deadline = nextDeadline
default:
}