diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 31672868d..ce5f40089 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -306,6 +306,65 @@ func TestV3LeaseSwitch(t *testing.T) { } } +// TestV3LeaseFailover ensures the old leader drops lease keepalive requests within +// election timeout after it loses its quorum. And the new leader extends the TTL of +// the lease to at least TTL + election timeout. +func TestV3LeaseFailover(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + toIsolate := clus.waitLeader(t, clus.Members) + + lc := toGRPC(clus.Client(toIsolate)).Lease + + // create lease + lresp, err := lc.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: 5}) + if err != nil { + t.Fatal(err) + } + if lresp.Error != "" { + t.Fatal(lresp.Error) + } + + // isolate the current leader with its followers. + clus.Members[toIsolate].Pause() + + lreq := &pb.LeaseKeepAliveRequest{ID: lresp.ID} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + lac, err := lc.LeaseKeepAlive(ctx) + if err != nil { + t.Fatal(err) + } + defer lac.CloseSend() + + // send keep alive to old leader until the old leader starts + // to drop lease request. + var expectedExp time.Time + for { + if err = lac.Send(lreq); err != nil { + break + } + lkresp, rxerr := lac.Recv() + if rxerr != nil { + break + } + expectedExp = time.Now().Add(time.Duration(lkresp.TTL) * time.Second) + time.Sleep(time.Duration(lkresp.TTL/2) * time.Second) + } + + clus.Members[toIsolate].Resume() + clus.waitLeader(t, clus.Members) + + // lease should not expire at the last received expire deadline. + time.Sleep(expectedExp.Sub(time.Now()) - 500*time.Millisecond) + + if !leaseExist(t, clus, lresp.ID) { + t.Error("unexpected lease not exists") + } +} + // acquireLeaseAndKey creates a new lease and creates an attached key. func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) { // create lease diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 8c804b4d7..1c09a28b9 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -217,6 +217,8 @@ func (p *peer) Pause() { p.mu.Lock() defer p.mu.Unlock() p.paused = true + p.msgAppReader.pause() + p.msgAppV2Reader.pause() } // Resume resumes a paused peer. @@ -224,6 +226,8 @@ func (p *peer) Resume() { p.mu.Lock() defer p.mu.Unlock() p.paused = false + p.msgAppReader.resume() + p.msgAppV2Reader.resume() } func (p *peer) stop() { diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 5ac4d797d..cc161373b 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -252,6 +252,7 @@ type streamReader struct { errorc chan<- error mu sync.Mutex + paused bool cancel func() closer io.Closer stopc chan struct{} @@ -331,6 +332,14 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { return err } + cr.mu.Lock() + paused := cr.paused + cr.mu.Unlock() + + if paused { + continue + } + if isLinkHeartbeatMessage(m) { // raft is not interested in link layer // heartbeat message, so we should ignore @@ -463,6 +472,18 @@ func (cr *streamReader) close() { cr.closer = nil } +func (cr *streamReader) pause() { + cr.mu.Lock() + defer cr.mu.Unlock() + cr.paused = true +} + +func (cr *streamReader) resume() { + cr.mu.Lock() + defer cr.mu.Unlock() + cr.paused = false +} + func isClosedConnectionError(err error) bool { operr, ok := err.(*net.OpError) return ok && operr.Err.Error() == "use of closed network connection"