mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
403 lines
11 KiB
Go
403 lines
11 KiB
Go
// Copyright 2015 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package backend
|
|
|
|
import (
|
|
"bytes"
|
|
"math"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
bolt "go.etcd.io/bbolt"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type BucketID int
|
|
|
|
type Bucket interface {
|
|
// ID returns a unique identifier of a bucket.
|
|
// The id must NOT be persisted and can be used as lightweight identificator
|
|
// in the in-memory maps.
|
|
ID() BucketID
|
|
Name() []byte
|
|
// String implements Stringer (human readable name).
|
|
String() string
|
|
|
|
// IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
|
|
// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
|
|
// is known to never overwrite any key so range is safe.
|
|
IsSafeRangeBucket() bool
|
|
}
|
|
|
|
type BatchTx interface {
|
|
ReadTx
|
|
UnsafeCreateBucket(bucket Bucket)
|
|
UnsafeDeleteBucket(bucket Bucket)
|
|
UnsafePut(bucket Bucket, key []byte, value []byte)
|
|
UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
|
|
UnsafeDelete(bucket Bucket, key []byte)
|
|
// Commit commits a previous tx and begins a new writable one.
|
|
Commit()
|
|
// CommitAndStop commits the previous tx and does not create a new one.
|
|
CommitAndStop()
|
|
LockInsideApply()
|
|
LockOutsideApply()
|
|
}
|
|
|
|
type batchTx struct {
|
|
sync.Mutex
|
|
tx *bolt.Tx
|
|
backend *backend
|
|
|
|
pending int
|
|
}
|
|
|
|
// Lock is supposed to be called only by the unit test.
|
|
func (t *batchTx) Lock() {
|
|
ValidateCalledInsideUnittest(t.backend.lg)
|
|
t.lock()
|
|
}
|
|
|
|
func (t *batchTx) lock() {
|
|
t.Mutex.Lock()
|
|
}
|
|
|
|
func (t *batchTx) LockInsideApply() {
|
|
t.lock()
|
|
if t.backend.txPostLockInsideApplyHook != nil {
|
|
// The callers of some methods (i.e., (*RaftCluster).AddMember)
|
|
// can be coming from both InsideApply and OutsideApply, but the
|
|
// callers from OutsideApply will have a nil txPostLockInsideApplyHook.
|
|
// So we should check the txPostLockInsideApplyHook before validating
|
|
// the callstack.
|
|
ValidateCalledInsideApply(t.backend.lg)
|
|
t.backend.txPostLockInsideApplyHook()
|
|
}
|
|
}
|
|
|
|
func (t *batchTx) LockOutsideApply() {
|
|
ValidateCalledOutSideApply(t.backend.lg)
|
|
t.lock()
|
|
}
|
|
|
|
func (t *batchTx) Unlock() {
|
|
if t.pending >= t.backend.batchLimit {
|
|
t.commit(false)
|
|
}
|
|
t.Mutex.Unlock()
|
|
}
|
|
|
|
// BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not
|
|
// have appropriate semantics in BatchTx interface. Therefore should not be called.
|
|
// TODO: might want to decouple ReadTx and BatchTx
|
|
|
|
func (t *batchTx) RLock() {
|
|
panic("unexpected RLock")
|
|
}
|
|
|
|
func (t *batchTx) RUnlock() {
|
|
panic("unexpected RUnlock")
|
|
}
|
|
|
|
func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
|
|
_, err := t.tx.CreateBucket(bucket.Name())
|
|
if err != nil && err != bolt.ErrBucketExists {
|
|
t.backend.lg.Fatal(
|
|
"failed to create a bucket",
|
|
zap.Stringer("bucket-name", bucket),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
t.pending++
|
|
}
|
|
|
|
func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {
|
|
err := t.tx.DeleteBucket(bucket.Name())
|
|
if err != nil && err != bolt.ErrBucketNotFound {
|
|
t.backend.lg.Fatal(
|
|
"failed to delete a bucket",
|
|
zap.Stringer("bucket-name", bucket),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
t.pending++
|
|
}
|
|
|
|
// UnsafePut must be called holding the lock on the tx.
|
|
func (t *batchTx) UnsafePut(bucket Bucket, key []byte, value []byte) {
|
|
t.unsafePut(bucket, key, value, false)
|
|
}
|
|
|
|
// UnsafeSeqPut must be called holding the lock on the tx.
|
|
func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
|
|
t.unsafePut(bucket, key, value, true)
|
|
}
|
|
|
|
func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
|
|
bucket := t.tx.Bucket(bucketType.Name())
|
|
if bucket == nil {
|
|
t.backend.lg.Fatal(
|
|
"failed to find a bucket",
|
|
zap.Stringer("bucket-name", bucketType),
|
|
zap.Stack("stack"),
|
|
)
|
|
}
|
|
if seq {
|
|
// it is useful to increase fill percent when the workloads are mostly append-only.
|
|
// this can delay the page split and reduce space usage.
|
|
bucket.FillPercent = 0.9
|
|
}
|
|
if err := bucket.Put(key, value); err != nil {
|
|
t.backend.lg.Fatal(
|
|
"failed to write to a bucket",
|
|
zap.Stringer("bucket-name", bucketType),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
t.pending++
|
|
}
|
|
|
|
// UnsafeRange must be called holding the lock on the tx.
|
|
func (t *batchTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
|
bucket := t.tx.Bucket(bucketType.Name())
|
|
if bucket == nil {
|
|
t.backend.lg.Fatal(
|
|
"failed to find a bucket",
|
|
zap.Stringer("bucket-name", bucketType),
|
|
zap.Stack("stack"),
|
|
)
|
|
}
|
|
return unsafeRange(bucket.Cursor(), key, endKey, limit)
|
|
}
|
|
|
|
func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
|
|
if limit <= 0 {
|
|
limit = math.MaxInt64
|
|
}
|
|
var isMatch func(b []byte) bool
|
|
if len(endKey) > 0 {
|
|
isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
|
|
} else {
|
|
isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
|
|
limit = 1
|
|
}
|
|
|
|
for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
|
|
vs = append(vs, cv)
|
|
keys = append(keys, ck)
|
|
if limit == int64(len(keys)) {
|
|
break
|
|
}
|
|
}
|
|
return keys, vs
|
|
}
|
|
|
|
// UnsafeDelete must be called holding the lock on the tx.
|
|
func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) {
|
|
bucket := t.tx.Bucket(bucketType.Name())
|
|
if bucket == nil {
|
|
t.backend.lg.Fatal(
|
|
"failed to find a bucket",
|
|
zap.Stringer("bucket-name", bucketType),
|
|
zap.Stack("stack"),
|
|
)
|
|
}
|
|
err := bucket.Delete(key)
|
|
if err != nil {
|
|
t.backend.lg.Fatal(
|
|
"failed to delete a key",
|
|
zap.Stringer("bucket-name", bucketType),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
t.pending++
|
|
}
|
|
|
|
// UnsafeForEach must be called holding the lock on the tx.
|
|
func (t *batchTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
|
|
return unsafeForEach(t.tx, bucket, visitor)
|
|
}
|
|
|
|
func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) error {
|
|
if b := tx.Bucket(bucket.Name()); b != nil {
|
|
return b.ForEach(visitor)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Commit commits a previous tx and begins a new writable one.
|
|
func (t *batchTx) Commit() {
|
|
t.lock()
|
|
t.commit(false)
|
|
t.Unlock()
|
|
}
|
|
|
|
// CommitAndStop commits the previous tx and does not create a new one.
|
|
func (t *batchTx) CommitAndStop() {
|
|
t.lock()
|
|
t.commit(true)
|
|
t.Unlock()
|
|
}
|
|
|
|
func (t *batchTx) safePending() int {
|
|
t.Mutex.Lock()
|
|
defer t.Mutex.Unlock()
|
|
return t.pending
|
|
}
|
|
|
|
func (t *batchTx) commit(stop bool) {
|
|
// commit the last tx
|
|
if t.tx != nil {
|
|
if t.pending == 0 && !stop {
|
|
return
|
|
}
|
|
|
|
start := time.Now()
|
|
|
|
// gofail: var beforeCommit struct{}
|
|
err := t.tx.Commit()
|
|
// gofail: var afterCommit struct{}
|
|
|
|
rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
|
|
spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
|
|
writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
|
|
commitSec.Observe(time.Since(start).Seconds())
|
|
atomic.AddInt64(&t.backend.commits, 1)
|
|
|
|
t.pending = 0
|
|
if err != nil {
|
|
t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
|
|
}
|
|
}
|
|
if !stop {
|
|
t.tx = t.backend.begin(true)
|
|
}
|
|
}
|
|
|
|
type batchTxBuffered struct {
|
|
batchTx
|
|
buf txWriteBuffer
|
|
pendingDeleteOperations int
|
|
}
|
|
|
|
func newBatchTxBuffered(backend *backend) *batchTxBuffered {
|
|
tx := &batchTxBuffered{
|
|
batchTx: batchTx{backend: backend},
|
|
buf: txWriteBuffer{
|
|
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
|
|
bucket2seq: make(map[BucketID]bool),
|
|
},
|
|
}
|
|
tx.Commit()
|
|
return tx
|
|
}
|
|
|
|
func (t *batchTxBuffered) Unlock() {
|
|
if t.pending != 0 {
|
|
t.backend.readTx.Lock() // blocks txReadBuffer for writing.
|
|
// gofail: var beforeWritebackBuf struct{}
|
|
t.buf.writeback(&t.backend.readTx.buf)
|
|
t.backend.readTx.Unlock()
|
|
// We commit the transaction when the number of pending operations
|
|
// reaches the configured limit(batchLimit) to prevent it from
|
|
// becoming excessively large.
|
|
//
|
|
// But we also need to commit the transaction immediately if there
|
|
// is any pending deleting operation, otherwise etcd might run into
|
|
// a situation that it haven't finished committing the data into backend
|
|
// storage (note: etcd periodically commits the bbolt transactions
|
|
// instead of on each request) when it applies next request. Accordingly,
|
|
// etcd may still read the stale data from bbolt when processing next
|
|
// request. So it breaks the linearizability.
|
|
//
|
|
// Note we don't need to commit the transaction for put requests if
|
|
// it doesn't exceed the batch limit, because there is a buffer on top
|
|
// of the bbolt. Each time when etcd reads data from backend storage,
|
|
// it will read data from both bbolt and the buffer. But there is no
|
|
// such a buffer for delete requests.
|
|
//
|
|
// Please also refer to
|
|
// https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158
|
|
if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
|
|
t.commit(false)
|
|
}
|
|
}
|
|
t.batchTx.Unlock()
|
|
}
|
|
|
|
func (t *batchTxBuffered) Commit() {
|
|
t.lock()
|
|
t.commit(false)
|
|
t.Unlock()
|
|
}
|
|
|
|
func (t *batchTxBuffered) CommitAndStop() {
|
|
t.lock()
|
|
t.commit(true)
|
|
t.Unlock()
|
|
}
|
|
|
|
func (t *batchTxBuffered) commit(stop bool) {
|
|
// all read txs must be closed to acquire boltdb commit rwlock
|
|
t.backend.readTx.Lock()
|
|
t.unsafeCommit(stop)
|
|
t.backend.readTx.Unlock()
|
|
}
|
|
|
|
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
|
if t.backend.hooks != nil {
|
|
t.backend.hooks.OnPreCommitUnsafe(t)
|
|
}
|
|
if t.backend.readTx.tx != nil {
|
|
// wait all store read transactions using the current boltdb tx to finish,
|
|
// then close the boltdb tx
|
|
go func(tx *bolt.Tx, wg *sync.WaitGroup) {
|
|
wg.Wait()
|
|
if err := tx.Rollback(); err != nil {
|
|
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
|
|
}
|
|
}(t.backend.readTx.tx, t.backend.readTx.txWg)
|
|
t.backend.readTx.reset()
|
|
}
|
|
|
|
t.batchTx.commit(stop)
|
|
t.pendingDeleteOperations = 0
|
|
|
|
if !stop {
|
|
t.backend.readTx.tx = t.backend.begin(false)
|
|
}
|
|
}
|
|
|
|
func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) {
|
|
t.batchTx.UnsafePut(bucket, key, value)
|
|
t.buf.put(bucket, key, value)
|
|
}
|
|
|
|
func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
|
|
t.batchTx.UnsafeSeqPut(bucket, key, value)
|
|
t.buf.putSeq(bucket, key, value)
|
|
}
|
|
|
|
func (t *batchTxBuffered) UnsafeDelete(bucketType Bucket, key []byte) {
|
|
t.batchTx.UnsafeDelete(bucketType, key)
|
|
t.pendingDeleteOperations++
|
|
}
|
|
|
|
func (t *batchTxBuffered) UnsafeDeleteBucket(bucket Bucket) {
|
|
t.batchTx.UnsafeDeleteBucket(bucket)
|
|
t.pendingDeleteOperations++
|
|
}
|