diff --git a/server/mvcc/backend/backend.go b/server/mvcc/backend/backend.go index 6c7037c24..84f35d141 100644 --- a/server/mvcc/backend/backend.go +++ b/server/mvcc/backend/backend.go @@ -104,6 +104,8 @@ type backend struct { stopc chan struct{} donec chan struct{} + hooks Hooks + lg *zap.Logger } @@ -124,6 +126,9 @@ type BackendConfig struct { UnsafeNoFsync bool `json:"unsafe-no-fsync"` // Mlock prevents backend database file to be swapped Mlock bool + + // Hooks are getting executed during lifecycle of Backend's transactions. + Hooks Hooks } func DefaultBackendConfig() BackendConfig { @@ -189,6 +194,9 @@ func newBackend(bcfg BackendConfig) *backend { lg: bcfg.Logger, } b.batchTx = newBatchTxBuffered(b) + // We set it after newBatchTxBuffered to skip the 'empty' commit. + b.hooks = bcfg.Hooks + go b.run() return b } diff --git a/server/mvcc/backend/batch_tx.go b/server/mvcc/backend/batch_tx.go index d4bc8c684..74107b445 100644 --- a/server/mvcc/backend/batch_tx.go +++ b/server/mvcc/backend/batch_tx.go @@ -109,6 +109,7 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo t.backend.lg.Fatal( "failed to find a bucket", zap.String("bucket-name", string(bucketName)), + zap.Stack("stack"), ) } if seq { @@ -133,6 +134,7 @@ func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][] t.backend.lg.Fatal( "failed to find a bucket", zap.String("bucket-name", string(bucketName)), + zap.Stack("stack"), ) } return unsafeRange(bucket.Cursor(), key, endKey, limit) @@ -167,6 +169,7 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { t.backend.lg.Fatal( "failed to find a bucket", zap.String("bucket-name", string(bucketName)), + zap.Stack("stack"), ) } err := bucket.Delete(key) @@ -283,6 +286,10 @@ func (t *batchTxBuffered) CommitAndStop() { } func (t *batchTxBuffered) commit(stop bool) { + if t.backend.hooks != nil { + t.backend.hooks.OnPreCommitUnsafe(t) + } + // all read txs must be closed to acquire boltdb commit rwlock t.backend.readTx.Lock() t.unsafeCommit(stop) diff --git a/server/mvcc/backend/hooks.go b/server/mvcc/backend/hooks.go new file mode 100644 index 000000000..9750828ef --- /dev/null +++ b/server/mvcc/backend/hooks.go @@ -0,0 +1,36 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend + +type HookFunc func(tx BatchTx) + +// Hooks allow to add additional logic executed during transaction lifetime. +type Hooks interface { + // OnPreCommitUnsafe is executed before Commit of transactions. + // The given transaction is already locked. + OnPreCommitUnsafe(tx BatchTx) +} + +type hooks struct { + onPreCommitUnsafe HookFunc +} + +func (h hooks) OnPreCommitUnsafe(tx BatchTx) { + h.onPreCommitUnsafe(tx) +} + +func NewHooks(onPreCommitUnsafe HookFunc) Hooks { + return hooks{onPreCommitUnsafe: onPreCommitUnsafe} +}