Merge pull request #12855 from ptabor/20210409-backend-hooks

(no)StoreV2 (Part 4): Backend hooks:  precommit updates consistency_index
This commit is contained in:
Piotr Tabor
2021-05-08 09:34:31 +02:00
committed by GitHub
34 changed files with 420 additions and 266 deletions

View File

@@ -34,7 +34,7 @@ import (
)
type KVGetter interface {
KV() mvcc.ConsistentWatchableKV
KV() mvcc.WatchableKV
}
type BackendGetter interface {

View File

@@ -28,7 +28,7 @@ import (
"go.uber.org/zap"
)
func newBackend(cfg config.ServerConfig) backend.Backend {
func newBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
bcfg := backend.DefaultBackendConfig()
bcfg.Path = cfg.BackendPath()
bcfg.UnsafeNoFsync = cfg.UnsafeNoFsync
@@ -51,12 +51,12 @@ func newBackend(cfg config.ServerConfig) backend.Backend {
bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10)
}
bcfg.Mlock = cfg.ExperimentalMemoryMlock
bcfg.Hooks = hooks
return backend.New(bcfg)
}
// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks backend.Hooks) (backend.Backend, error) {
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
if err != nil {
return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
@@ -64,16 +64,16 @@ func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot
if err := os.Rename(snapPath, cfg.BackendPath()); err != nil {
return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err)
}
return openBackend(cfg), nil
return openBackend(cfg, hooks), nil
}
// openBackend returns a backend using the current etcd db.
func openBackend(cfg config.ServerConfig) backend.Backend {
func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
fn := cfg.BackendPath()
now, beOpened := time.Now(), make(chan backend.Backend)
go func() {
beOpened <- newBackend(cfg)
beOpened <- newBackend(cfg, hooks)
}()
select {
@@ -96,15 +96,14 @@ func openBackend(cfg config.ServerConfig) backend.Backend {
// before updating the backend db after persisting raft snapshot to disk,
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool) (backend.Backend, error) {
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
ci := cindex.NewConsistentIndex(oldbe.BatchTx())
consistentIndex = ci.ConsistentIndex()
consistentIndex = cindex.ReadConsistentIndex(oldbe.BatchTx())
}
if snapshot.Metadata.Index <= consistentIndex {
return oldbe, nil
}
oldbe.Close()
return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot)
return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks)
}

View File

@@ -28,6 +28,10 @@ var (
ConsistentIndexKeyName = []byte("consistent_index")
)
type Backend interface {
BatchTx() backend.BatchTx
}
// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
type ConsistentIndexer interface {
@@ -41,32 +45,38 @@ type ConsistentIndexer interface {
// It saves consistentIndex to the underlying stable storage.
UnsafeSave(tx backend.BatchTx)
// SetBatchTx set the available backend.BatchTx for ConsistentIndexer.
SetBatchTx(tx backend.BatchTx)
// SetBackend set the available backend.BatchTx for ConsistentIndexer.
SetBackend(be Backend)
}
// consistentIndex implements the ConsistentIndexer interface.
type consistentIndex struct {
tx backend.BatchTx
// consistentIndex represents the offset of an entry in a consistent replica log.
// it caches the "consistent_index" key's value. Accessed
// through atomics so must be 64-bit aligned.
// it caches the "consistent_index" key's value.
// Accessed through atomics so must be 64-bit aligned.
consistentIndex uint64
mutex sync.Mutex
// be is used for initial read consistentIndex
be Backend
// mutex is protecting be.
mutex sync.Mutex
}
func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer {
return &consistentIndex{tx: tx}
// NewConsistentIndex creates a new consistent index.
// If `be` is nil, it must be set (SetBackend) before first access using `ConsistentIndex()`.
func NewConsistentIndex(be Backend) ConsistentIndexer {
return &consistentIndex{be: be}
}
func (ci *consistentIndex) ConsistentIndex() uint64 {
if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
return index
}
ci.mutex.Lock()
defer ci.mutex.Unlock()
v := ReadConsistentIndex(ci.tx)
v := ReadConsistentIndex(ci.be.BatchTx())
atomic.StoreUint64(&ci.consistentIndex, v)
return v
}
@@ -76,17 +86,15 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) {
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex)
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}
unsafeUpdateConsistentIndex(tx, index)
UnsafeUpdateConsistentIndex(tx, index, true)
}
func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) {
func (ci *consistentIndex) SetBackend(be Backend) {
ci.mutex.Lock()
defer ci.mutex.Unlock()
ci.tx = tx
ci.be = be
// After the backend is changed, the first access should re-read it.
ci.SetConsistentIndex(0)
}
func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
@@ -101,13 +109,21 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) {
atomic.StoreUint64(&f.index, index)
}
func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {}
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func UnsafeCreateMetaBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(MetaBucketName)
}
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func CreateMetaBucket(tx backend.BatchTx) {
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(MetaBucketName)
}
// unsafeGetConsistentIndex loads consistent index from given transaction.
// returns 0 if the data are not found.
func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 {
@@ -127,7 +143,19 @@ func ReadConsistentIndex(tx backend.ReadTx) uint64 {
return unsafeReadConsistentIndex(tx)
}
func unsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64) {
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) {
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}
if onlyGrow {
oldi := unsafeReadConsistentIndex(tx)
if index <= oldi {
return
}
}
bs := make([]byte, 8) // this is kept on stack (not heap) so its quick.
binary.BigEndian.PutUint64(bs, index)
// put the index into the underlying backend
@@ -135,13 +163,8 @@ func unsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64) {
tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs)
}
func UpdateConsistentIndex(tx backend.BatchTx, index uint64) {
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) {
tx.Lock()
defer tx.Unlock()
oldi := unsafeReadConsistentIndex(tx)
if index <= oldi {
return
}
unsafeUpdateConsistentIndex(tx, index)
UnsafeUpdateConsistentIndex(tx, index, onlyGrow)
}

View File

@@ -27,13 +27,14 @@ import (
func TestConsistentIndex(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
ci := NewConsistentIndex(be.BatchTx())
ci := NewConsistentIndex(be)
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
UnsafeCreateMetaBucket(tx)
tx.Unlock()
be.ForceCommit()
@@ -51,14 +52,13 @@ func TestConsistentIndex(t *testing.T) {
b := backend.NewDefaultBackend(tmpPath)
defer b.Close()
ci.SetConsistentIndex(0)
ci.SetBatchTx(b.BatchTx())
ci.SetBackend(b)
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
}
ci = NewConsistentIndex(b.BatchTx())
ci = NewConsistentIndex(b)
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)

View File

@@ -256,10 +256,11 @@ type EtcdServer struct {
applyV3Internal applierV3Internal
applyWait wait.WaitTime
kv mvcc.ConsistentWatchableKV
kv mvcc.WatchableKV
lessor lease.Lessor
bemu sync.Mutex
be backend.Backend
beHooks backend.Hooks
authStore auth.AuthStore
alarmStore *v3alarm.AlarmStore
@@ -294,6 +295,15 @@ type EtcdServer struct {
*AccessController
}
type backendHooks struct {
indexer cindex.ConsistentIndexer
lg *zap.Logger
}
func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
bh.indexer.UnsafeSave(tx)
}
// NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
@@ -345,7 +355,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
bepath := cfg.BackendPath()
beExist := fileutil.Exist(bepath)
be := openBackend(cfg)
ci := cindex.NewConsistentIndex(nil)
beHooks := &backendHooks{lg: cfg.Logger, indexer: ci}
be := openBackend(cfg, beHooks)
ci.SetBackend(be)
cindex.CreateMetaBucket(be.BatchTx())
defer func() {
if err != nil {
@@ -463,7 +478,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
)
if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist); err != nil {
if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
}
s1, s2 := be.Size(), be.SizeInUse()
@@ -529,7 +544,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
peerRt: prt,
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
consistIndex: ci,
firstCommitInTermC: make(chan struct{}),
}
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
@@ -537,20 +552,16 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
srv.be = be
srv.beHooks = beHooks
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(
srv.Logger(),
srv.be,
lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
},
srv.consistIndex,
)
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
func(index uint64) <-chan struct{} {
@@ -562,8 +573,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
return nil, err
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
kvindex := srv.consistIndex.ConsistentIndex()
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
kvindex := ci.ConsistentIndex()
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))
if beExist {
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
@@ -579,7 +591,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
}
srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, srv.consistIndex, tp, int(cfg.BcryptCost))
srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost))
newSrv := srv // since srv == nil in defer if srv is returned as nil
defer func() {
@@ -1170,7 +1182,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
// wait for raftNode to persist snapshot onto the disk
<-apply.notifyc
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot)
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)
if err != nil {
lg.Panic("failed to open snapshot backend", zap.Error(err))
}
@@ -1191,8 +1203,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
lg.Panic("failed to restore mvcc store", zap.Error(err))
}
s.consistIndex.SetConsistentIndex(s.kv.ConsistentIndex())
lg.Info("restored mvcc store")
s.consistIndex.SetBackend(newbe)
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
// Closing old backend might block until all the txns
// on the backend are finished.
@@ -2117,6 +2129,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
return
}
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
if raftReq.V2 != nil {
req := (*RequestV2)(raftReq.V2)
s.w.Trigger(req.ID, s.applyV2Request(req))
@@ -2502,7 +2515,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
}
}
func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv }
func (s *EtcdServer) KV() mvcc.WatchableKV { return s.kv }
func (s *EtcdServer) Backend() backend.Backend {
s.bemu.Lock()
defer s.bemu.Unlock()

View File

@@ -989,9 +989,9 @@ func TestSnapshot(t *testing.T) {
lg: zap.NewExample(),
r: *r,
v2store: st,
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
consistIndex: cindex.NewConsistentIndex(be),
}
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, srv.consistIndex, mvcc.StoreConfig{})
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
srv.be = be
ch := make(chan struct{}, 2)
@@ -1074,11 +1074,11 @@ func TestSnapshotOrdering(t *testing.T) {
snapshotter: snap.New(zap.NewExample(), snapdir),
cluster: cl,
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
consistIndex: cindex.NewConsistentIndex(be),
}
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, s.consistIndex, mvcc.StoreConfig{})
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
s.be = be
s.start()
@@ -1148,11 +1148,11 @@ func TestTriggerSnap(t *testing.T) {
v2store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
consistIndex: cindex.NewConsistentIndex(be),
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, srv.consistIndex, mvcc.StoreConfig{})
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
srv.be = be
srv.start()
@@ -1227,11 +1227,11 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
snapshotter: snap.New(zap.NewExample(), testdir),
cluster: cl,
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
consistIndex: cindex.NewConsistentIndex(be),
}
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, s.consistIndex, mvcc.StoreConfig{})
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
s.be = be
s.start()
@@ -1562,7 +1562,7 @@ func TestPublishV3(t *testing.T) {
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
authStore: auth.NewAuthStore(lg, be, nil, nil, 0),
authStore: auth.NewAuthStore(lg, be, nil, 0),
be: be,
ctx: ctx,
cancel: cancel,
@@ -1633,7 +1633,7 @@ func TestPublishV3Retry(t *testing.T) {
cluster: &membership.RaftCluster{},
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
authStore: auth.NewAuthStore(lg, be, nil, nil, 0),
authStore: auth.NewAuthStore(lg, be, nil, 0),
be: be,
ctx: ctx,
cancel: cancel,