Simplify lease management after cindex update is moved to 'hooks'.

This commit is contained in:
Piotr Tabor 2021-04-28 08:49:40 +02:00
parent 2dbecea5b2
commit 2fb6f0a74b
5 changed files with 36 additions and 45 deletions

View File

@ -301,9 +301,7 @@ type backendHooks struct {
}
func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
if bh.indexer != nil {
bh.indexer.UnsafeSave(tx)
}
bh.indexer.UnsafeSave(tx)
}
// NewServer creates a new EtcdServer from the supplied configuration. The
@ -358,8 +356,11 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
bepath := cfg.BackendPath()
beExist := fileutil.Exist(bepath)
beHooks := &backendHooks{lg: cfg.Logger}
ci := cindex.NewConsistentIndex(nil)
beHooks := &backendHooks{lg: cfg.Logger, indexer: ci}
be := openBackend(cfg, beHooks)
ci.SetBackend(be)
cindex.CreateMetaBucket(be.BatchTx())
defer func() {
if err != nil {
@ -543,7 +544,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
peerRt: prt,
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
consistIndex: ci,
firstCommitInTermC: make(chan struct{}),
}
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
@ -556,16 +557,11 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(
srv.Logger(),
srv.be,
lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
},
srv.consistIndex,
)
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
func(index uint64) <-chan struct{} {
@ -577,12 +573,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
return nil, err
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
// Backend should contain 'meta' bucket going forward.
beHooks.indexer = srv.consistIndex
kvindex := srv.consistIndex.ConsistentIndex()
kvindex := ci.ConsistentIndex()
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))
if beExist {
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
@ -1210,6 +1203,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
lg.Panic("failed to restore mvcc store", zap.Error(err))
}
s.consistIndex.SetBackend(newbe)
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
// Closing old backend might block until all the txns

View File

@ -31,7 +31,7 @@ func TestRenewHTTP(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
@ -55,7 +55,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
@ -96,7 +96,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {

View File

@ -25,7 +25,6 @@ import (
"time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.uber.org/zap"
@ -182,7 +181,6 @@ type lessor struct {
checkpointInterval time.Duration
// the interval to check if the expired lease is revoked
expiredLeaseRetryInterval time.Duration
ci cindex.ConsistentIndexer
}
type LessorConfig struct {
@ -191,11 +189,11 @@ type LessorConfig struct {
ExpiredLeasesRetryInterval time.Duration
}
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) Lessor {
return newLessor(lg, b, cfg, ci)
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
return newLessor(lg, b, cfg)
}
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) *lessor {
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
if checkpointInterval == 0 {
@ -218,7 +216,6 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.Co
stopC: make(chan struct{}),
doneC: make(chan struct{}),
lg: lg,
ci: ci,
}
l.initAndRecover()

View File

@ -68,7 +68,7 @@ func setUp(t testing.TB) (le *lessor, tearDown func()) {
be, _ := betesting.NewDefaultTmpBackend(t)
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}, nil)
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
le.SetRangeDeleter(func() TxnDelete {
ftd := &FakeTxnDelete{be.BatchTx()}
ftd.Lock()

View File

@ -45,7 +45,7 @@ func TestLessorGrant(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.Promote(0)
@ -107,7 +107,7 @@ func TestLeaseConcurrentKeys(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
@ -156,7 +156,7 @@ func TestLessorRevoke(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
var fd *fakeDeleter
le.SetRangeDeleter(func() TxnDelete {
@ -209,7 +209,7 @@ func TestLessorRenew(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.Promote(0)
@ -242,7 +242,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
for _, cp := range cp.GetCheckpoints() {
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
@ -291,7 +291,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
@ -310,7 +310,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg)
defer be.Close()
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
// extend after recovery should extend expiration on lease pile-up
@ -340,7 +340,7 @@ func TestLessorDetach(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
@ -381,7 +381,7 @@ func TestLessorRecover(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
l1, err1 := le.Grant(1, 10)
l2, err2 := le.Grant(2, 20)
@ -390,7 +390,7 @@ func TestLessorRecover(t *testing.T) {
}
// Create a new lessor with the same backend
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer nle.Stop()
nl1 := nle.Lookup(l1.ID)
if nl1 == nil || nl1.ttl != l1.ttl {
@ -411,7 +411,7 @@ func TestLessorExpire(t *testing.T) {
testMinTTL := int64(1)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()
le.Promote(1 * time.Second)
@ -464,7 +464,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
testMinTTL := int64(1)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()
le.Promote(1 * time.Second)
@ -513,7 +513,7 @@ func TestLessorMaxTTL(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
_, err := le.Grant(1, MaxLeaseTTL+1)
@ -529,7 +529,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}, nil)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
le.minLeaseTTL = 1
checkpointedC := make(chan struct{})
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
@ -564,7 +564,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
l, err := le.Grant(1, 10)
if err != nil {