Revert "Merge pull request #7732 from heyitsanthony/lease-err-ka"

This reverts commit fbbc4a4979b256125d2238f6325ad86e7f1b58bd, reversing
changes made to f254e383859a2939d5929346f6595549f424f7c5.

Fixes #7851
This commit is contained in:
Anthony Romano 2017-05-02 09:34:54 -07:00
parent f0ca65a95d
commit 6dd8fb6f24
9 changed files with 144 additions and 137 deletions

View File

@ -51,9 +51,12 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
} }
ctx, cancel := context.WithCancel(ops.ctx) ctx, cancel := context.WithCancel(ops.ctx)
keepAlive := client.KeepAlive(ctx, id) keepAlive, err := client.KeepAlive(ctx, id)
donec := make(chan struct{}) if err != nil || keepAlive == nil {
return nil, err
}
donec := make(chan struct{})
s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec} s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
// keep the lease alive until client error or cancelled context // keep the lease alive until client error or cancelled context

View File

@ -100,13 +100,12 @@ func ExampleLease_keepAlive() {
} }
// the key 'foo' will be kept forever // the key 'foo' will be kept forever
ch := cli.KeepAlive(context.TODO(), resp.ID) ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {
ka := <-ch log.Fatal(kaerr)
if ka.Err != nil {
log.Fatal(ka.Err)
} }
ka := <-ch
fmt.Println("ttl:", ka.TTL) fmt.Println("ttl:", ka.TTL)
// Output: ttl: 5 // Output: ttl: 5
} }
@ -132,9 +131,9 @@ func ExampleLease_keepAliveOnce() {
} }
// to renew the lease only once // to renew the lease only once
ka := cli.KeepAliveOnce(context.TODO(), resp.ID) ka, kaerr := cli.KeepAliveOnce(context.TODO(), resp.ID)
if ka.Err != nil { if kaerr != nil {
log.Fatal(ka.Err) log.Fatal(kaerr)
} }
fmt.Println("ttl:", ka.TTL) fmt.Println("ttl:", ka.TTL)

View File

@ -104,14 +104,14 @@ func TestLeaseKeepAliveOnce(t *testing.T) {
t.Errorf("failed to create lease %v", err) t.Errorf("failed to create lease %v", err)
} }
ka := lapi.KeepAliveOnce(context.Background(), resp.ID) _, err = lapi.KeepAliveOnce(context.Background(), resp.ID)
if ka.Err != nil { if err != nil {
t.Errorf("failed to keepalive lease %v", ka.Err) t.Errorf("failed to keepalive lease %v", err)
} }
ka = lapi.KeepAliveOnce(context.Background(), clientv3.LeaseID(0)) _, err = lapi.KeepAliveOnce(context.Background(), clientv3.LeaseID(0))
if ka.Err != rpctypes.ErrLeaseNotFound { if err != rpctypes.ErrLeaseNotFound {
t.Errorf("expected %v, got %v", rpctypes.ErrLeaseNotFound, ka.Err) t.Errorf("expected %v, got %v", rpctypes.ErrLeaseNotFound, err)
} }
} }
@ -129,7 +129,10 @@ func TestLeaseKeepAlive(t *testing.T) {
t.Errorf("failed to create lease %v", err) t.Errorf("failed to create lease %v", err)
} }
rc := lapi.KeepAlive(context.Background(), resp.ID) rc, kerr := lapi.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Errorf("failed to keepalive lease %v", kerr)
}
kresp, ok := <-rc kresp, ok := <-rc
if !ok { if !ok {
@ -160,7 +163,11 @@ func TestLeaseKeepAliveOneSecond(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("failed to create lease %v", err) t.Errorf("failed to create lease %v", err)
} }
rc := cli.KeepAlive(context.Background(), resp.ID) rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Errorf("failed to keepalive lease %v", kerr)
}
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
if _, ok := <-rc; !ok { if _, ok := <-rc; !ok {
t.Errorf("chan is closed, want not closed") t.Errorf("chan is closed, want not closed")
@ -186,7 +193,10 @@ func TestLeaseKeepAliveHandleFailure(t *testing.T) {
t.Errorf("failed to create lease %v", err) t.Errorf("failed to create lease %v", err)
} }
rc := lapi.KeepAlive(context.Background(), resp.ID) rc, kerr := lapi.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Errorf("failed to keepalive lease %v", kerr)
}
kresp := <-rc kresp := <-rc
if kresp.ID != resp.ID { if kresp.ID != resp.ID {
@ -220,7 +230,7 @@ func TestLeaseKeepAliveHandleFailure(t *testing.T) {
type leaseCh struct { type leaseCh struct {
lid clientv3.LeaseID lid clientv3.LeaseID
ch clientv3.LeaseKeepAliveChan ch <-chan *clientv3.LeaseKeepAliveResponse
} }
// TestLeaseKeepAliveNotFound ensures a revoked lease won't stop other keep alives // TestLeaseKeepAliveNotFound ensures a revoked lease won't stop other keep alives
@ -237,7 +247,10 @@ func TestLeaseKeepAliveNotFound(t *testing.T) {
if rerr != nil { if rerr != nil {
t.Fatal(rerr) t.Fatal(rerr)
} }
kach := cli.KeepAlive(context.Background(), resp.ID) kach, kaerr := cli.KeepAlive(context.Background(), resp.ID)
if kaerr != nil {
t.Fatal(kaerr)
}
lchs = append(lchs, leaseCh{resp.ID, kach}) lchs = append(lchs, leaseCh{resp.ID, kach})
} }
@ -362,7 +375,10 @@ func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
rc := cli.KeepAlive(context.Background(), resp.ID) rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Fatal(kerr)
}
kresp := <-rc kresp := <-rc
if kresp.ID != resp.ID { if kresp.ID != resp.ID {
t.Fatalf("ID = %x, want %x", kresp.ID, resp.ID) t.Fatalf("ID = %x, want %x", kresp.ID, resp.ID)
@ -381,10 +397,9 @@ func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {
// some keep-alives may still be buffered; drain until close // some keep-alives may still be buffered; drain until close
timer := time.After(time.Duration(kresp.TTL) * time.Second) timer := time.After(time.Duration(kresp.TTL) * time.Second)
loop := true for kresp != nil {
for loop {
select { select {
case _, loop = <-rc: case kresp = <-rc:
case <-timer: case <-timer:
t.Fatalf("keepalive channel did not close") t.Fatalf("keepalive channel did not close")
} }
@ -408,7 +423,10 @@ func TestLeaseKeepAliveInitTimeout(t *testing.T) {
} }
// keep client disconnected // keep client disconnected
clus.Members[0].Stop(t) clus.Members[0].Stop(t)
rc := cli.KeepAlive(context.Background(), resp.ID) rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Fatal(kerr)
}
select { select {
case ka, ok := <-rc: case ka, ok := <-rc:
if ok { if ok {
@ -436,7 +454,10 @@ func TestLeaseKeepAliveTTLTimeout(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
rc := cli.KeepAlive(context.Background(), resp.ID) rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Fatal(kerr)
}
if kresp := <-rc; kresp.ID != resp.ID { if kresp := <-rc; kresp.ID != resp.ID {
t.Fatalf("ID = %x, want %x", kresp.ID, resp.ID) t.Fatalf("ID = %x, want %x", kresp.ID, resp.ID)
} }
@ -559,7 +580,10 @@ func TestLeaseRenewLostQuorum(t *testing.T) {
kctx, kcancel := context.WithCancel(context.Background()) kctx, kcancel := context.WithCancel(context.Background())
defer kcancel() defer kcancel()
ka := cli.KeepAlive(kctx, r.ID) ka, err := cli.KeepAlive(kctx, r.ID)
if err != nil {
t.Fatal(err)
}
// consume first keepalive so next message sends when cluster is down // consume first keepalive so next message sends when cluster is down
<-ka <-ka
lastKa := time.Now() lastKa := time.Now()
@ -606,9 +630,9 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) {
} }
cli.Close() cli.Close()
ka := cli.KeepAlive(ctx, resp.ID) _, err = cli.KeepAlive(ctx, resp.ID)
if resp, ok := <-ka; ok { if _, ok := err.(clientv3.ErrKeepAliveHalted); !ok {
t.Fatalf("expected closed channel, got response %+v", resp) t.Fatalf("expected %T, got %v(%T)", clientv3.ErrKeepAliveHalted{}, err, err)
} }
} }
@ -683,9 +707,15 @@ func TestLeaseWithRequireLeader(t *testing.T) {
t.Fatal(err2) t.Fatal(err2)
} }
// kaReqLeader close if the leader is lost // kaReqLeader close if the leader is lost
kaReqLeader := c.KeepAlive(clientv3.WithRequireLeader(context.TODO()), lid1.ID) kaReqLeader, kerr1 := c.KeepAlive(clientv3.WithRequireLeader(context.TODO()), lid1.ID)
if kerr1 != nil {
t.Fatal(kerr1)
}
// kaWait will wait even if the leader is lost // kaWait will wait even if the leader is lost
kaWait := c.KeepAlive(context.TODO(), lid2.ID) kaWait, kerr2 := c.KeepAlive(context.TODO(), lid2.ID)
if kerr2 != nil {
t.Fatal(kerr2)
}
select { select {
case <-kaReqLeader: case <-kaReqLeader:

View File

@ -43,8 +43,6 @@ type LeaseKeepAliveResponse struct {
*pb.ResponseHeader *pb.ResponseHeader
ID LeaseID ID LeaseID
TTL int64 TTL int64
Err error
Deadline time.Time
} }
// LeaseTimeToLiveResponse is used to convert the protobuf lease timetolive response. // LeaseTimeToLiveResponse is used to convert the protobuf lease timetolive response.
@ -72,11 +70,23 @@ const (
NoLease LeaseID = 0 NoLease LeaseID = 0
// retryConnWait is how long to wait before retrying on a lost leader // retryConnWait is how long to wait before retrying on a lost leader
// or keep alive loop failure.
retryConnWait = 500 * time.Millisecond retryConnWait = 500 * time.Millisecond
) )
type LeaseKeepAliveChan <-chan LeaseKeepAliveResponse // ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
//
// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
type ErrKeepAliveHalted struct {
Reason error
}
func (e ErrKeepAliveHalted) Error() string {
s := "etcdclient: leases keep alive halted"
if e.Reason != nil {
s += ": " + e.Reason.Error()
}
return s
}
type Lease interface { type Lease interface {
// Grant creates a new lease. // Grant creates a new lease.
@ -88,24 +98,12 @@ type Lease interface {
// TimeToLive retrieves the lease information of the given lease ID. // TimeToLive retrieves the lease information of the given lease ID.
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// KeepAlive keeps the given lease alive forever. If the keepalive response posted to // KeepAlive keeps the given lease alive forever.
// the channel is not consumed immediately, the lease client will continue sending keep alive requests KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
// to the etcd server at least every second until latest response is consumed.
//
// The KeepAlive channel closes if the underlying keep alive stream is interrupted in some
// way the client cannot handle itself; the error will be posted in the last keep
// alive message before closing. If there is no keepalive response within the
// lease's time-out, the channel will close with no error. In most cases calling
// KeepAlive again will re-establish keepalives with the target lease if it has not
// expired.
KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan
// KeepAliveOnce renews the lease once. The response corresponds to the // KeepAliveOnce renews the lease once. In most of the cases, Keepalive
// first message from calling KeepAlive. If the response has a recoverable // should be used instead of KeepAliveOnce.
// error, KeepAliveOnce will retry the RPC with a new keep alive message. KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
//
// In most of the cases, Keepalive should be used instead of KeepAliveOnce.
KeepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse
// Close releases all resources Lease keeps for efficient communication // Close releases all resources Lease keeps for efficient communication
// with the etcd server. // with the etcd server.
@ -115,8 +113,9 @@ type Lease interface {
type lessor struct { type lessor struct {
mu sync.Mutex // guards all fields mu sync.Mutex // guards all fields
// donec is closed when all goroutines are torn down from Close() // donec is closed and loopErr is set when recvKeepAliveLoop stops
donec chan struct{} donec chan struct{}
loopErr error
remote pb.LeaseClient remote pb.LeaseClient
@ -138,7 +137,7 @@ type lessor struct {
// keepAlive multiplexes a keepalive for a lease over multiple channels // keepAlive multiplexes a keepalive for a lease over multiple channels
type keepAlive struct { type keepAlive struct {
chs []chan<- LeaseKeepAliveResponse chs []chan<- *LeaseKeepAliveResponse
ctxs []context.Context ctxs []context.Context
// deadline is the time the keep alive channels close if no response // deadline is the time the keep alive channels close if no response
deadline time.Time deadline time.Time
@ -220,22 +219,24 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
} }
} }
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan { func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
ch := make(chan LeaseKeepAliveResponse, leaseResponseChSize) ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
l.mu.Lock() l.mu.Lock()
// ensure that recvKeepAliveLoop is still running // ensure that recvKeepAliveLoop is still running
select { select {
case <-l.donec: case <-l.donec:
err := l.loopErr
l.mu.Unlock()
close(ch) close(ch)
return ch return ch, ErrKeepAliveHalted{Reason: err}
default: default:
} }
ka, ok := l.keepAlives[id] ka, ok := l.keepAlives[id]
if !ok { if !ok {
// create fresh keep alive // create fresh keep alive
ka = &keepAlive{ ka = &keepAlive{
chs: []chan<- LeaseKeepAliveResponse{ch}, chs: []chan<- *LeaseKeepAliveResponse{ch},
ctxs: []context.Context{ctx}, ctxs: []context.Context{ctx},
deadline: time.Now().Add(l.firstKeepAliveTimeout), deadline: time.Now().Add(l.firstKeepAliveTimeout),
nextKeepAlive: time.Now(), nextKeepAlive: time.Now(),
@ -251,51 +252,24 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan {
go l.keepAliveCtxCloser(id, ctx, ka.donec) go l.keepAliveCtxCloser(id, ctx, ka.donec)
l.firstKeepAliveOnce.Do(func() { l.firstKeepAliveOnce.Do(func() {
go func() { go l.recvKeepAliveLoop()
defer func() {
l.mu.Lock()
for _, ka := range l.keepAlives {
ka.Close(nil)
}
close(l.donec)
l.mu.Unlock()
}()
for l.stopCtx.Err() == nil {
err := l.recvKeepAliveLoop()
if err == context.Canceled {
// canceled by user; no error like WatchChan
err = nil
}
l.mu.Lock()
for _, ka := range l.keepAlives {
ka.Close(err)
}
l.keepAlives = make(map[LeaseID]*keepAlive)
l.mu.Unlock()
select {
case <-l.stopCtx.Done():
case <-time.After(retryConnWait):
}
}
}()
go l.deadlineLoop() go l.deadlineLoop()
}) })
return ch return ch, nil
} }
func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse { func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
for { for {
resp := l.keepAliveOnce(ctx, id) resp, err := l.keepAliveOnce(ctx, id)
if resp.Err == nil { if err == nil {
if resp.TTL <= 0 { if resp.TTL <= 0 {
resp.Err = rpctypes.ErrLeaseNotFound err = rpctypes.ErrLeaseNotFound
} }
return resp return resp, err
} }
if isHaltErr(ctx, resp.Err) { if isHaltErr(ctx, err) {
return resp return nil, toErr(ctx, err)
} }
} }
} }
@ -365,7 +339,7 @@ func (l *lessor) closeRequireLeader() {
continue continue
} }
// remove all channels that required a leader from keepalive // remove all channels that required a leader from keepalive
newChs := make([]chan<- LeaseKeepAliveResponse, len(ka.chs)-reqIdxs) newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
newCtxs := make([]context.Context, len(newChs)) newCtxs := make([]context.Context, len(newChs))
newIdx := 0 newIdx := 0
for i := range ka.chs { for i := range ka.chs {
@ -379,34 +353,45 @@ func (l *lessor) closeRequireLeader() {
} }
} }
func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse { func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
cctx, cancel := context.WithCancel(ctx) cctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false)) stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false))
if err != nil { if err != nil {
return LeaseKeepAliveResponse{Err: toErr(ctx, err)} return nil, toErr(ctx, err)
} }
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)}) err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
if err != nil { if err != nil {
return LeaseKeepAliveResponse{Err: toErr(ctx, err)} return nil, toErr(ctx, err)
} }
resp, rerr := stream.Recv() resp, rerr := stream.Recv()
if rerr != nil { if rerr != nil {
return LeaseKeepAliveResponse{Err: toErr(ctx, rerr)} return nil, toErr(ctx, rerr)
} }
return LeaseKeepAliveResponse{ karesp := &LeaseKeepAliveResponse{
ResponseHeader: resp.GetHeader(), ResponseHeader: resp.GetHeader(),
ID: LeaseID(resp.ID), ID: LeaseID(resp.ID),
TTL: resp.TTL, TTL: resp.TTL,
Deadline: time.Now().Add(time.Duration(resp.TTL) * time.Second),
} }
return karesp, nil
} }
func (l *lessor) recvKeepAliveLoop() (gerr error) { func (l *lessor) recvKeepAliveLoop() (gerr error) {
defer func() {
l.mu.Lock()
close(l.donec)
l.loopErr = gerr
for _, ka := range l.keepAlives {
ka.Close()
}
l.keepAlives = make(map[LeaseID]*keepAlive)
l.mu.Unlock()
}()
stream, serr := l.resetRecv() stream, serr := l.resetRecv()
for serr == nil { for serr == nil {
resp, err := stream.Recv() resp, err := stream.Recv()
@ -458,7 +443,6 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
ResponseHeader: resp.GetHeader(), ResponseHeader: resp.GetHeader(),
ID: LeaseID(resp.ID), ID: LeaseID(resp.ID),
TTL: resp.TTL, TTL: resp.TTL,
Deadline: time.Now().Add(time.Duration(resp.TTL) * time.Second),
} }
l.mu.Lock() l.mu.Lock()
@ -472,7 +456,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
if karesp.TTL <= 0 { if karesp.TTL <= 0 {
// lease expired; close all keep alive channels // lease expired; close all keep alive channels
delete(l.keepAlives, karesp.ID) delete(l.keepAlives, karesp.ID)
ka.Close(nil) ka.Close()
return return
} }
@ -481,7 +465,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second) ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
for _, ch := range ka.chs { for _, ch := range ka.chs {
select { select {
case ch <- *karesp: case ch <- karesp:
ka.nextKeepAlive = nextKeepAlive ka.nextKeepAlive = nextKeepAlive
default: default:
} }
@ -502,7 +486,7 @@ func (l *lessor) deadlineLoop() {
for id, ka := range l.keepAlives { for id, ka := range l.keepAlives {
if ka.deadline.Before(now) { if ka.deadline.Before(now) {
// waited too long for response; lease may be expired // waited too long for response; lease may be expired
ka.Close(nil) ka.Close()
delete(l.keepAlives, id) delete(l.keepAlives, id)
} }
} }
@ -544,18 +528,9 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
} }
} }
func (ka *keepAlive) Close(err error) { func (ka *keepAlive) Close() {
close(ka.donec) close(ka.donec)
for _, ch := range ka.chs { for _, ch := range ka.chs {
if err != nil {
// try to post error if buffer space available
select {
case ch <- LeaseKeepAliveResponse{Err: err}:
default:
}
}
close(ch) close(ch)
} }
// so keepAliveCtxClose doesn't double-close ka.chs
ka.chs, ka.ctxs = nil, nil
} }

View File

@ -148,12 +148,13 @@ func leaseKeepAliveCommandFunc(cmd *cobra.Command, args []string) {
} }
id := leaseFromArgs(args[0]) id := leaseFromArgs(args[0])
respc := mustClientFromCmd(cmd).KeepAlive(context.TODO(), id) respc, kerr := mustClientFromCmd(cmd).KeepAlive(context.TODO(), id)
for resp := range respc { if kerr != nil {
if resp.Err != nil { ExitWithError(ExitBadConnection, kerr)
ExitWithError(ExitError, resp.Err)
} }
display.KeepAlive(resp)
for resp := range respc {
display.KeepAlive(*resp)
} }
if _, ok := (display).(*simplePrinter); ok { if _, ok := (display).(*simplePrinter); ok {

View File

@ -75,7 +75,6 @@ type proxyCloser struct {
clientv3.Watcher clientv3.Watcher
wdonec <-chan struct{} wdonec <-chan struct{}
kvdonec <-chan struct{} kvdonec <-chan struct{}
lclose func()
lpdonec <-chan struct{} lpdonec <-chan struct{}
} }
@ -84,7 +83,6 @@ func (pc *proxyCloser) Close() error {
<-pc.kvdonec <-pc.kvdonec
err := pc.Watcher.Close() err := pc.Watcher.Close()
<-pc.wdonec <-pc.wdonec
pc.lclose()
<-pc.lpdonec <-pc.lpdonec
return err return err
} }
@ -97,13 +95,11 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
rpc := toGRPC(c) rpc := toGRPC(c)
c.KV = clientv3.NewKVFromKVClient(rpc.KV) c.KV = clientv3.NewKVFromKVClient(rpc.KV)
pmu.Lock() pmu.Lock()
lc := c.Lease
c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout) c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout)
c.Watcher = &proxyCloser{ c.Watcher = &proxyCloser{
Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch), Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch),
wdonec: proxies[c].wdonec, wdonec: proxies[c].wdonec,
kvdonec: proxies[c].kvdonec, kvdonec: proxies[c].kvdonec,
lclose: func() { lc.Close() },
lpdonec: proxies[c].lpdonec, lpdonec: proxies[c].lpdonec,
} }
pmu.Unlock() pmu.Unlock()

View File

@ -255,7 +255,10 @@ func (lps *leaseProxyStream) recvLoop() error {
func (lps *leaseProxyStream) keepAliveLoop(leaseID int64, neededResps *atomicCounter) error { func (lps *leaseProxyStream) keepAliveLoop(leaseID int64, neededResps *atomicCounter) error {
cctx, ccancel := context.WithCancel(lps.ctx) cctx, ccancel := context.WithCancel(lps.ctx)
defer ccancel() defer ccancel()
respc := lps.lessor.KeepAlive(cctx, clientv3.LeaseID(leaseID)) respc, err := lps.lessor.KeepAlive(cctx, clientv3.LeaseID(leaseID))
if err != nil {
return err
}
// ticker expires when loop hasn't received keepalive within TTL // ticker expires when loop hasn't received keepalive within TTL
var ticker <-chan time.Time var ticker <-chan time.Time
for { for {
@ -273,7 +276,7 @@ func (lps *leaseProxyStream) keepAliveLoop(leaseID int64, neededResps *atomicCou
lps.mu.Unlock() lps.mu.Unlock()
return nil return nil
case rp, ok := <-respc: case rp, ok := <-respc:
if !ok || rp.Err != nil { if !ok {
lps.mu.Lock() lps.mu.Lock()
delete(lps.keepAliveLeases, leaseID) delete(lps.keepAliveLeases, leaseID)
lps.mu.Unlock() lps.mu.Unlock()

View File

@ -61,8 +61,8 @@ func leaseKeepaliveFunc(cmd *cobra.Command, args []string) {
} }
for range requests { for range requests {
st := time.Now() st := time.Now()
ka := c.KeepAliveOnce(context.TODO(), resp.ID) _, err := c.KeepAliveOnce(context.TODO(), resp.ID)
r.Results() <- report.Result{Err: ka.Err, Start: st, End: time.Now()} r.Results() <- report.Result{Err: err, Start: st, End: time.Now()}
bar.Increment() bar.Increment()
} }
}(clients[i]) }(clients[i])

View File

@ -56,6 +56,7 @@ func runLeaseRenewerFunc(cmd *cobra.Command, args []string) {
for { for {
var ( var (
l *clientv3.LeaseGrantResponse l *clientv3.LeaseGrantResponse
lk *clientv3.LeaseKeepAliveResponse
err error err error
) )
for { for {
@ -67,14 +68,13 @@ func runLeaseRenewerFunc(cmd *cobra.Command, args []string) {
expire := time.Now().Add(time.Duration(l.TTL-1) * time.Second) expire := time.Now().Add(time.Duration(l.TTL-1) * time.Second)
for { for {
lk := c.Lease.KeepAliveOnce(ctx, l.ID) lk, err = c.Lease.KeepAliveOnce(ctx, l.ID)
err = lk.Err
if grpc.Code(err) == codes.NotFound { if grpc.Code(err) == codes.NotFound {
if time.Since(expire) < 0 { if time.Since(expire) < 0 {
log.Fatalf("bad renew! exceeded: %v", time.Since(expire)) log.Fatalf("bad renew! exceeded: %v", time.Since(expire))
for { for {
lk = c.Lease.KeepAliveOnce(ctx, l.ID) lk, err = c.Lease.KeepAliveOnce(ctx, l.ID)
fmt.Println(lk) fmt.Println(lk, err)
time.Sleep(time.Second) time.Sleep(time.Second)
} }
} }