mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: ensure grant/revoke won't be applied repeatedly after restarting etcd
This commit is contained in:
parent
a4ada8cb1f
commit
e9ae8eb5a1
@ -406,11 +406,12 @@ func (s *v3Manager) saveDB() error {
|
|||||||
// having a new raft instance
|
// having a new raft instance
|
||||||
be := backend.NewDefaultBackend(dbpath)
|
be := backend.NewDefaultBackend(dbpath)
|
||||||
|
|
||||||
// a lessor never timeouts leases
|
|
||||||
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
|
|
||||||
|
|
||||||
ci := cindex.NewConsistentIndex(be.BatchTx())
|
ci := cindex.NewConsistentIndex(be.BatchTx())
|
||||||
ci.SetConsistentIndex(uint64(commit))
|
ci.SetConsistentIndex(uint64(commit))
|
||||||
|
|
||||||
|
// a lessor never timeouts leases
|
||||||
|
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}, ci)
|
||||||
|
|
||||||
mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
|
mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
|
||||||
txn := mvs.Write(traceutil.TODO())
|
txn := mvs.Write(traceutil.TODO())
|
||||||
btx := be.BatchTx()
|
btx := be.BatchTx()
|
||||||
|
@ -523,7 +523,9 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
|
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
|
||||||
CheckpointInterval: cfg.LeaseCheckpointInterval,
|
CheckpointInterval: cfg.LeaseCheckpointInterval,
|
||||||
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
|
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
|
||||||
})
|
},
|
||||||
|
srv.consistIndex,
|
||||||
|
)
|
||||||
|
|
||||||
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
|
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
|
||||||
func(index uint64) <-chan struct{} {
|
func(index uint64) <-chan struct{} {
|
||||||
|
@ -33,7 +33,7 @@ func TestRenewHTTP(t *testing.T) {
|
|||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil)
|
||||||
le.Promote(time.Second)
|
le.Promote(time.Second)
|
||||||
l, err := le.Grant(1, int64(5))
|
l, err := le.Grant(1, int64(5))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -58,7 +58,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
|
|||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil)
|
||||||
le.Promote(time.Second)
|
le.Promote(time.Second)
|
||||||
l, err := le.Grant(1, int64(5))
|
l, err := le.Grant(1, int64(5))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -100,7 +100,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
|
|||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil)
|
||||||
le.Promote(time.Second)
|
le.Promote(time.Second)
|
||||||
l, err := le.Grant(1, int64(5))
|
l, err := le.Grant(1, int64(5))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/v3/etcdserver/cindex"
|
||||||
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
|
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
|
||||||
"go.etcd.io/etcd/v3/lease/leasepb"
|
"go.etcd.io/etcd/v3/lease/leasepb"
|
||||||
"go.etcd.io/etcd/v3/mvcc/backend"
|
"go.etcd.io/etcd/v3/mvcc/backend"
|
||||||
@ -181,6 +182,7 @@ type lessor struct {
|
|||||||
checkpointInterval time.Duration
|
checkpointInterval time.Duration
|
||||||
// the interval to check if the expired lease is revoked
|
// the interval to check if the expired lease is revoked
|
||||||
expiredLeaseRetryInterval time.Duration
|
expiredLeaseRetryInterval time.Duration
|
||||||
|
ci cindex.ConsistentIndexer
|
||||||
}
|
}
|
||||||
|
|
||||||
type LessorConfig struct {
|
type LessorConfig struct {
|
||||||
@ -189,11 +191,11 @@ type LessorConfig struct {
|
|||||||
ExpiredLeasesRetryInterval time.Duration
|
ExpiredLeasesRetryInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
|
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) Lessor {
|
||||||
return newLessor(lg, b, cfg)
|
return newLessor(lg, b, cfg, ci)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
|
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) *lessor {
|
||||||
checkpointInterval := cfg.CheckpointInterval
|
checkpointInterval := cfg.CheckpointInterval
|
||||||
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
|
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
|
||||||
if checkpointInterval == 0 {
|
if checkpointInterval == 0 {
|
||||||
@ -216,6 +218,7 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
|
|||||||
stopC: make(chan struct{}),
|
stopC: make(chan struct{}),
|
||||||
doneC: make(chan struct{}),
|
doneC: make(chan struct{}),
|
||||||
lg: lg,
|
lg: lg,
|
||||||
|
ci: ci,
|
||||||
}
|
}
|
||||||
l.initAndRecover()
|
l.initAndRecover()
|
||||||
|
|
||||||
@ -291,7 +294,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
le.leaseMap[id] = l
|
le.leaseMap[id] = l
|
||||||
l.persistTo(le.b)
|
l.persistTo(le.b, le.ci)
|
||||||
|
|
||||||
leaseTotalTTLs.Observe(float64(l.ttl))
|
leaseTotalTTLs.Observe(float64(l.ttl))
|
||||||
leaseGranted.Inc()
|
leaseGranted.Inc()
|
||||||
@ -338,6 +341,10 @@ func (le *lessor) Revoke(id LeaseID) error {
|
|||||||
// kv deletion. Or we might end up with not executing the revoke or not
|
// kv deletion. Or we might end up with not executing the revoke or not
|
||||||
// deleting the keys if etcdserver fails in between.
|
// deleting the keys if etcdserver fails in between.
|
||||||
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
|
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
|
||||||
|
// if len(keys) > 0, txn.End() will call ci.UnsafeSave function.
|
||||||
|
if le.ci != nil && len(keys) == 0 {
|
||||||
|
le.ci.UnsafeSave(le.b.BatchTx())
|
||||||
|
}
|
||||||
|
|
||||||
txn.End()
|
txn.End()
|
||||||
|
|
||||||
@ -821,7 +828,7 @@ func (l *Lease) expired() bool {
|
|||||||
return l.Remaining() <= 0
|
return l.Remaining() <= 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Lease) persistTo(b backend.Backend) {
|
func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) {
|
||||||
key := int64ToBytes(int64(l.ID))
|
key := int64ToBytes(int64(l.ID))
|
||||||
|
|
||||||
lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
|
lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
|
||||||
@ -832,6 +839,9 @@ func (l *Lease) persistTo(b backend.Backend) {
|
|||||||
|
|
||||||
b.BatchTx().Lock()
|
b.BatchTx().Lock()
|
||||||
b.BatchTx().UnsafePut(leaseBucketName, key, val)
|
b.BatchTx().UnsafePut(leaseBucketName, key, val)
|
||||||
|
if ci != nil {
|
||||||
|
ci.UnsafeSave(b.BatchTx())
|
||||||
|
}
|
||||||
b.BatchTx().Unlock()
|
b.BatchTx().Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,7 +69,7 @@ func setUp() (le *lessor, tearDown func()) {
|
|||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
|
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
|
||||||
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
|
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
|
||||||
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
|
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}, nil)
|
||||||
le.SetRangeDeleter(func() TxnDelete {
|
le.SetRangeDeleter(func() TxnDelete {
|
||||||
ftd := &FakeTxnDelete{be.BatchTx()}
|
ftd := &FakeTxnDelete{be.BatchTx()}
|
||||||
ftd.Lock()
|
ftd.Lock()
|
||||||
|
@ -45,7 +45,7 @@ func TestLessorGrant(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
le.Promote(0)
|
le.Promote(0)
|
||||||
|
|
||||||
@ -107,7 +107,7 @@ func TestLeaseConcurrentKeys(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
|
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
|
||||||
|
|
||||||
@ -156,7 +156,7 @@ func TestLessorRevoke(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
var fd *fakeDeleter
|
var fd *fakeDeleter
|
||||||
le.SetRangeDeleter(func() TxnDelete {
|
le.SetRangeDeleter(func() TxnDelete {
|
||||||
@ -209,7 +209,7 @@ func TestLessorRenew(t *testing.T) {
|
|||||||
defer be.Close()
|
defer be.Close()
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
le.Promote(0)
|
le.Promote(0)
|
||||||
|
|
||||||
@ -242,7 +242,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
|
|||||||
defer be.Close()
|
defer be.Close()
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
|
||||||
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
|
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
|
||||||
for _, cp := range cp.GetCheckpoints() {
|
for _, cp := range cp.GetCheckpoints() {
|
||||||
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
|
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
|
||||||
@ -291,7 +291,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
|
|||||||
dir, be := NewTestBackend(t)
|
dir, be := NewTestBackend(t)
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
|
||||||
ttl := int64(10)
|
ttl := int64(10)
|
||||||
for i := 1; i <= leaseRevokeRate*10; i++ {
|
for i := 1; i <= leaseRevokeRate*10; i++ {
|
||||||
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
|
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
|
||||||
@ -310,7 +310,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
|
|||||||
bcfg.Path = filepath.Join(dir, "be")
|
bcfg.Path = filepath.Join(dir, "be")
|
||||||
be = backend.New(bcfg)
|
be = backend.New(bcfg)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
|
|
||||||
// extend after recovery should extend expiration on lease pile-up
|
// extend after recovery should extend expiration on lease pile-up
|
||||||
@ -340,7 +340,7 @@ func TestLessorDetach(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
|
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
|
||||||
|
|
||||||
@ -381,7 +381,7 @@ func TestLessorRecover(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
l1, err1 := le.Grant(1, 10)
|
l1, err1 := le.Grant(1, 10)
|
||||||
l2, err2 := le.Grant(2, 20)
|
l2, err2 := le.Grant(2, 20)
|
||||||
@ -390,7 +390,7 @@ func TestLessorRecover(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create a new lessor with the same backend
|
// Create a new lessor with the same backend
|
||||||
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
|
||||||
defer nle.Stop()
|
defer nle.Stop()
|
||||||
nl1 := nle.Lookup(l1.ID)
|
nl1 := nle.Lookup(l1.ID)
|
||||||
if nl1 == nil || nl1.ttl != l1.ttl {
|
if nl1 == nil || nl1.ttl != l1.ttl {
|
||||||
@ -411,7 +411,7 @@ func TestLessorExpire(t *testing.T) {
|
|||||||
|
|
||||||
testMinTTL := int64(1)
|
testMinTTL := int64(1)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
|
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil)
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
|
|
||||||
le.Promote(1 * time.Second)
|
le.Promote(1 * time.Second)
|
||||||
@ -464,7 +464,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
|
|||||||
|
|
||||||
testMinTTL := int64(1)
|
testMinTTL := int64(1)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
|
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil)
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
|
|
||||||
le.Promote(1 * time.Second)
|
le.Promote(1 * time.Second)
|
||||||
@ -513,7 +513,7 @@ func TestLessorMaxTTL(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
|
|
||||||
_, err := le.Grant(1, MaxLeaseTTL+1)
|
_, err := le.Grant(1, MaxLeaseTTL+1)
|
||||||
@ -529,7 +529,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
|
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}, nil)
|
||||||
le.minLeaseTTL = 1
|
le.minLeaseTTL = 1
|
||||||
checkpointedC := make(chan struct{})
|
checkpointedC := make(chan struct{})
|
||||||
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
|
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
|
||||||
@ -564,7 +564,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
l, err := le.Grant(1, 10)
|
l, err := le.Grant(1, 10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user