mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
lease,integration: add checkpoint scheduling after leader change
Current checkpointing mechanism is buggy. New checkpoints for any lease are scheduled only until the first leader change. Added fix for that and a test that will check it. Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
7ee7029c08
commit
a30aba8fc2
@ -313,54 +313,95 @@ func TestV3LeaseKeepAlive(t *testing.T) {
|
||||
// TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted
|
||||
// across leader elections.
|
||||
func TestV3LeaseCheckpoint(t *testing.T) {
|
||||
var ttl int64 = 300
|
||||
leaseInterval := 2 * time.Second
|
||||
defer testutil.AfterTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{
|
||||
Size: 3,
|
||||
EnableLeaseCheckpoint: true,
|
||||
LeaseCheckpointInterval: leaseInterval,
|
||||
})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
// create lease
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
c := toGRPC(clus.RandClient())
|
||||
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
tcs := []struct {
|
||||
name string
|
||||
checkpointingEnabled bool
|
||||
ttl time.Duration
|
||||
checkpointingInterval time.Duration
|
||||
leaderChanges int
|
||||
expectTTLIsGT time.Duration
|
||||
expectTTLIsLT time.Duration
|
||||
}{
|
||||
{
|
||||
name: "Checkpointing disabled, lease TTL is reset",
|
||||
ttl: 300 * time.Second,
|
||||
leaderChanges: 1,
|
||||
expectTTLIsGT: 298 * time.Second,
|
||||
},
|
||||
{
|
||||
name: "Checkpointing enabled 10s, lease TTL is preserved after leader change",
|
||||
ttl: 300 * time.Second,
|
||||
checkpointingEnabled: true,
|
||||
checkpointingInterval: 10 * time.Second,
|
||||
leaderChanges: 1,
|
||||
expectTTLIsLT: 290 * time.Second,
|
||||
},
|
||||
{
|
||||
// Checking if checkpointing continues after the first leader change.
|
||||
name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes",
|
||||
ttl: 300 * time.Second,
|
||||
checkpointingEnabled: true,
|
||||
checkpointingInterval: 10 * time.Second,
|
||||
leaderChanges: 2,
|
||||
expectTTLIsLT: 280 * time.Second,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
config := &ClusterConfig{
|
||||
Size: 3,
|
||||
EnableLeaseCheckpoint: tc.checkpointingEnabled,
|
||||
LeaseCheckpointInterval: tc.checkpointingInterval,
|
||||
}
|
||||
clus := NewClusterV3(t, config)
|
||||
defer clus.Terminate(t)
|
||||
|
||||
// wait for a checkpoint to occur
|
||||
time.Sleep(leaseInterval + 1*time.Second)
|
||||
|
||||
// Force a leader election
|
||||
leaderId := clus.WaitLeader(t)
|
||||
leader := clus.Members[leaderId]
|
||||
leader.Stop(t)
|
||||
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
|
||||
leader.Restart(t)
|
||||
newLeaderId := clus.WaitLeader(t)
|
||||
c2 := toGRPC(clus.Client(newLeaderId))
|
||||
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
||||
// Check the TTL of the new leader
|
||||
var ttlresp *pb.LeaseTimeToLiveResponse
|
||||
for i := 0; i < 10; i++ {
|
||||
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
|
||||
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
|
||||
time.Sleep(time.Millisecond * 250)
|
||||
} else {
|
||||
// create lease
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
c := toGRPC(clus.RandClient())
|
||||
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: int64(tc.ttl.Seconds())})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
expectedTTL := ttl - int64(leaseInterval.Seconds())
|
||||
if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL {
|
||||
t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL)
|
||||
for i := 0; i < tc.leaderChanges; i++ {
|
||||
// wait for a checkpoint to occur
|
||||
time.Sleep(tc.checkpointingInterval + 1*time.Second)
|
||||
|
||||
// Force a leader election
|
||||
leaderId := clus.WaitLeader(t)
|
||||
leader := clus.Members[leaderId]
|
||||
leader.Stop(t)
|
||||
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
|
||||
leader.Restart(t)
|
||||
}
|
||||
|
||||
newLeaderId := clus.WaitLeader(t)
|
||||
c2 := toGRPC(clus.Client(newLeaderId))
|
||||
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
||||
// Check the TTL of the new leader
|
||||
var ttlresp *pb.LeaseTimeToLiveResponse
|
||||
for i := 0; i < 10; i++ {
|
||||
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
|
||||
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
|
||||
time.Sleep(time.Millisecond * 250)
|
||||
} else {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second <= tc.expectTTLIsGT {
|
||||
t.Errorf("Expected lease ttl (%v) to be greather than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT)
|
||||
}
|
||||
|
||||
if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT {
|
||||
t.Errorf("Expected lease ttl (%v) to be lower than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsLT)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -448,6 +448,7 @@ func (le *lessor) Promote(extend time.Duration) {
|
||||
l.refresh(extend)
|
||||
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
|
||||
le.leaseExpiredNotifier.RegisterOrUpdate(item)
|
||||
le.scheduleCheckpointIfNeeded(l)
|
||||
}
|
||||
|
||||
if len(le.leaseMap) < leaseRevokeRate {
|
||||
|
@ -533,6 +533,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
|
||||
defer be.Close()
|
||||
|
||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
|
||||
defer le.Stop()
|
||||
le.minLeaseTTL = 1
|
||||
checkpointedC := make(chan struct{})
|
||||
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
|
||||
@ -545,13 +546,11 @@ func TestLessorCheckpointScheduling(t *testing.T) {
|
||||
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
|
||||
}
|
||||
})
|
||||
defer le.Stop()
|
||||
le.Promote(0)
|
||||
|
||||
_, err := le.Grant(1, 2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
le.Promote(0)
|
||||
|
||||
// TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds.
|
||||
select {
|
||||
|
Loading…
x
Reference in New Issue
Block a user