mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

`unsafeCommit` is called by both `(*batchTxBuffered) commit` and `(*backend) defrag`. When users perform the defragmentation operation, etcd doesn't update the consistent index. If etcd crashes(e.g. panicking) in the process for whatever reason, then etcd replays the WAL entries starting from the latest snapshot, accordingly it may re-apply entries which might have already been applied, eventually the revision isn't consistent with other members. Refer to discussion in https://github.com/etcd-io/etcd/pull/14685 Signed-off-by: Benjamin Wang <wachao@vmware.com>
370 lines
9.1 KiB
Go
370 lines
9.1 KiB
Go
// Copyright 2015 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package backend
|
|
|
|
import (
|
|
"bytes"
|
|
"math"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
bolt "go.etcd.io/bbolt"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type BucketID int
|
|
|
|
type Bucket interface {
|
|
// ID returns a unique identifier of a bucket.
|
|
// The id must NOT be persisted and can be used as lightweight identificator
|
|
// in the in-memory maps.
|
|
ID() BucketID
|
|
Name() []byte
|
|
// String implements Stringer (human readable name).
|
|
String() string
|
|
|
|
// IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
|
|
// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
|
|
// is known to never overwrite any key so range is safe.
|
|
IsSafeRangeBucket() bool
|
|
}
|
|
|
|
type BatchTx interface {
|
|
ReadTx
|
|
UnsafeCreateBucket(bucket Bucket)
|
|
UnsafeDeleteBucket(bucket Bucket)
|
|
UnsafePut(bucket Bucket, key []byte, value []byte)
|
|
UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
|
|
UnsafeDelete(bucket Bucket, key []byte)
|
|
// Commit commits a previous tx and begins a new writable one.
|
|
Commit()
|
|
// CommitAndStop commits the previous tx and does not create a new one.
|
|
CommitAndStop()
|
|
LockInsideApply()
|
|
LockOutsideApply()
|
|
}
|
|
|
|
type batchTx struct {
|
|
sync.Mutex
|
|
tx *bolt.Tx
|
|
backend *backend
|
|
|
|
pending int
|
|
}
|
|
|
|
// Lock is supposed to be called only by the unit test.
|
|
func (t *batchTx) Lock() {
|
|
ValidateCalledInsideUnittest(t.backend.lg)
|
|
t.lock()
|
|
}
|
|
|
|
func (t *batchTx) lock() {
|
|
t.Mutex.Lock()
|
|
}
|
|
|
|
func (t *batchTx) LockInsideApply() {
|
|
t.lock()
|
|
if t.backend.txPostLockInsideApplyHook != nil {
|
|
// The callers of some methods (i.e., (*RaftCluster).AddMember)
|
|
// can be coming from both InsideApply and OutsideApply, but the
|
|
// callers from OutsideApply will have a nil txPostLockInsideApplyHook.
|
|
// So we should check the txPostLockInsideApplyHook before validating
|
|
// the callstack.
|
|
ValidateCalledInsideApply(t.backend.lg)
|
|
t.backend.txPostLockInsideApplyHook()
|
|
}
|
|
}
|
|
|
|
func (t *batchTx) LockOutsideApply() {
|
|
ValidateCalledOutSideApply(t.backend.lg)
|
|
t.lock()
|
|
}
|
|
|
|
func (t *batchTx) Unlock() {
|
|
if t.pending >= t.backend.batchLimit {
|
|
t.commit(false)
|
|
}
|
|
t.Mutex.Unlock()
|
|
}
|
|
|
|
// BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not
|
|
// have appropriate semantics in BatchTx interface. Therefore should not be called.
|
|
// TODO: might want to decouple ReadTx and BatchTx
|
|
|
|
func (t *batchTx) RLock() {
|
|
panic("unexpected RLock")
|
|
}
|
|
|
|
func (t *batchTx) RUnlock() {
|
|
panic("unexpected RUnlock")
|
|
}
|
|
|
|
func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
|
|
_, err := t.tx.CreateBucket(bucket.Name())
|
|
if err != nil && err != bolt.ErrBucketExists {
|
|
t.backend.lg.Fatal(
|
|
"failed to create a bucket",
|
|
zap.Stringer("bucket-name", bucket),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
t.pending++
|
|
}
|
|
|
|
func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {
|
|
err := t.tx.DeleteBucket(bucket.Name())
|
|
if err != nil && err != bolt.ErrBucketNotFound {
|
|
t.backend.lg.Fatal(
|
|
"failed to delete a bucket",
|
|
zap.Stringer("bucket-name", bucket),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
t.pending++
|
|
}
|
|
|
|
// UnsafePut must be called holding the lock on the tx.
|
|
func (t *batchTx) UnsafePut(bucket Bucket, key []byte, value []byte) {
|
|
t.unsafePut(bucket, key, value, false)
|
|
}
|
|
|
|
// UnsafeSeqPut must be called holding the lock on the tx.
|
|
func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
|
|
t.unsafePut(bucket, key, value, true)
|
|
}
|
|
|
|
func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
|
|
bucket := t.tx.Bucket(bucketType.Name())
|
|
if bucket == nil {
|
|
t.backend.lg.Fatal(
|
|
"failed to find a bucket",
|
|
zap.Stringer("bucket-name", bucketType),
|
|
zap.Stack("stack"),
|
|
)
|
|
}
|
|
if seq {
|
|
// it is useful to increase fill percent when the workloads are mostly append-only.
|
|
// this can delay the page split and reduce space usage.
|
|
bucket.FillPercent = 0.9
|
|
}
|
|
if err := bucket.Put(key, value); err != nil {
|
|
t.backend.lg.Fatal(
|
|
"failed to write to a bucket",
|
|
zap.Stringer("bucket-name", bucketType),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
t.pending++
|
|
}
|
|
|
|
// UnsafeRange must be called holding the lock on the tx.
|
|
func (t *batchTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
|
bucket := t.tx.Bucket(bucketType.Name())
|
|
if bucket == nil {
|
|
t.backend.lg.Fatal(
|
|
"failed to find a bucket",
|
|
zap.Stringer("bucket-name", bucketType),
|
|
zap.Stack("stack"),
|
|
)
|
|
}
|
|
return unsafeRange(bucket.Cursor(), key, endKey, limit)
|
|
}
|
|
|
|
func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
|
|
if limit <= 0 {
|
|
limit = math.MaxInt64
|
|
}
|
|
var isMatch func(b []byte) bool
|
|
if len(endKey) > 0 {
|
|
isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
|
|
} else {
|
|
isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
|
|
limit = 1
|
|
}
|
|
|
|
for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
|
|
vs = append(vs, cv)
|
|
keys = append(keys, ck)
|
|
if limit == int64(len(keys)) {
|
|
break
|
|
}
|
|
}
|
|
return keys, vs
|
|
}
|
|
|
|
// UnsafeDelete must be called holding the lock on the tx.
|
|
func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) {
|
|
bucket := t.tx.Bucket(bucketType.Name())
|
|
if bucket == nil {
|
|
t.backend.lg.Fatal(
|
|
"failed to find a bucket",
|
|
zap.Stringer("bucket-name", bucketType),
|
|
zap.Stack("stack"),
|
|
)
|
|
}
|
|
err := bucket.Delete(key)
|
|
if err != nil {
|
|
t.backend.lg.Fatal(
|
|
"failed to delete a key",
|
|
zap.Stringer("bucket-name", bucketType),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
t.pending++
|
|
}
|
|
|
|
// UnsafeForEach must be called holding the lock on the tx.
|
|
func (t *batchTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
|
|
return unsafeForEach(t.tx, bucket, visitor)
|
|
}
|
|
|
|
func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) error {
|
|
if b := tx.Bucket(bucket.Name()); b != nil {
|
|
return b.ForEach(visitor)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Commit commits a previous tx and begins a new writable one.
|
|
func (t *batchTx) Commit() {
|
|
t.lock()
|
|
t.commit(false)
|
|
t.Unlock()
|
|
}
|
|
|
|
// CommitAndStop commits the previous tx and does not create a new one.
|
|
func (t *batchTx) CommitAndStop() {
|
|
t.lock()
|
|
t.commit(true)
|
|
t.Unlock()
|
|
}
|
|
|
|
func (t *batchTx) safePending() int {
|
|
t.Mutex.Lock()
|
|
defer t.Mutex.Unlock()
|
|
return t.pending
|
|
}
|
|
|
|
func (t *batchTx) commit(stop bool) {
|
|
// commit the last tx
|
|
if t.tx != nil {
|
|
if t.pending == 0 && !stop {
|
|
return
|
|
}
|
|
|
|
start := time.Now()
|
|
|
|
// gofail: var beforeCommit struct{}
|
|
err := t.tx.Commit()
|
|
// gofail: var afterCommit struct{}
|
|
|
|
rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
|
|
spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
|
|
writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
|
|
commitSec.Observe(time.Since(start).Seconds())
|
|
atomic.AddInt64(&t.backend.commits, 1)
|
|
|
|
t.pending = 0
|
|
if err != nil {
|
|
t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
|
|
}
|
|
}
|
|
if !stop {
|
|
t.tx = t.backend.begin(true)
|
|
}
|
|
}
|
|
|
|
type batchTxBuffered struct {
|
|
batchTx
|
|
buf txWriteBuffer
|
|
}
|
|
|
|
func newBatchTxBuffered(backend *backend) *batchTxBuffered {
|
|
tx := &batchTxBuffered{
|
|
batchTx: batchTx{backend: backend},
|
|
buf: txWriteBuffer{
|
|
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
|
|
bucket2seq: make(map[BucketID]bool),
|
|
},
|
|
}
|
|
tx.Commit()
|
|
return tx
|
|
}
|
|
|
|
func (t *batchTxBuffered) Unlock() {
|
|
if t.pending != 0 {
|
|
t.backend.readTx.Lock() // blocks txReadBuffer for writing.
|
|
t.buf.writeback(&t.backend.readTx.buf)
|
|
t.backend.readTx.Unlock()
|
|
if t.pending >= t.backend.batchLimit {
|
|
t.commit(false)
|
|
}
|
|
}
|
|
t.batchTx.Unlock()
|
|
}
|
|
|
|
func (t *batchTxBuffered) Commit() {
|
|
t.lock()
|
|
t.commit(false)
|
|
t.Unlock()
|
|
}
|
|
|
|
func (t *batchTxBuffered) CommitAndStop() {
|
|
t.lock()
|
|
t.commit(true)
|
|
t.Unlock()
|
|
}
|
|
|
|
func (t *batchTxBuffered) commit(stop bool) {
|
|
// all read txs must be closed to acquire boltdb commit rwlock
|
|
t.backend.readTx.Lock()
|
|
t.unsafeCommit(stop)
|
|
t.backend.readTx.Unlock()
|
|
}
|
|
|
|
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
|
if t.backend.hooks != nil {
|
|
t.backend.hooks.OnPreCommitUnsafe(t)
|
|
}
|
|
if t.backend.readTx.tx != nil {
|
|
// wait all store read transactions using the current boltdb tx to finish,
|
|
// then close the boltdb tx
|
|
go func(tx *bolt.Tx, wg *sync.WaitGroup) {
|
|
wg.Wait()
|
|
if err := tx.Rollback(); err != nil {
|
|
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
|
|
}
|
|
}(t.backend.readTx.tx, t.backend.readTx.txWg)
|
|
t.backend.readTx.reset()
|
|
}
|
|
|
|
t.batchTx.commit(stop)
|
|
|
|
if !stop {
|
|
t.backend.readTx.tx = t.backend.begin(false)
|
|
}
|
|
}
|
|
|
|
func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) {
|
|
t.batchTx.UnsafePut(bucket, key, value)
|
|
t.buf.put(bucket, key, value)
|
|
}
|
|
|
|
func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
|
|
t.batchTx.UnsafeSeqPut(bucket, key, value)
|
|
t.buf.putSeq(bucket, key, value)
|
|
}
|