Merge 4db8df677c618b462145fce7cb926c072a0ce932 into c86c93ca2951338115159dcdd20711603044e1f1

This commit is contained in:
Fu Wei 2024-09-25 21:36:55 -07:00 committed by GitHub
commit d63ca43092
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 228 additions and 16 deletions

View File

@ -145,6 +145,14 @@ var (
// revision 5000 when the current revision is 6000.
// This runs every 5-minute if enough of logs have proceeded.
CompactorModeRevision = v3compactor.ModeRevision
// CompactorModeRevisionThreshold is revision-change-based compaction
// mode for "Config.AutoCompactionMode" field.
// If "AutoCompactionMode" is CompactorModeRevisionThreshold and
// "AutoCompactionRetention" is "10000", it compacts log on revision
// 10000 when the current revision is 20000. This runs if there are
// more than 10000 revisions have occurred since previous compaction.
CompactorModeRevisionThreshold = v3compactor.ModeRevisionThreshold
)
func init() {
@ -1056,7 +1064,7 @@ func (cfg *Config) Validate() error {
}
switch cfg.AutoCompactionMode {
case CompactorModeRevision, CompactorModePeriodic:
case CompactorModeRevision, CompactorModePeriodic, CompactorModeRevisionThreshold:
case "":
return errors.New("undefined auto-compaction-mode")
default:

View File

@ -900,6 +900,8 @@ func parseCompactionRetention(mode, retention string) (ret time.Duration, err er
ret = time.Duration(int64(h))
case CompactorModePeriodic:
ret = time.Duration(int64(h)) * time.Hour
case CompactorModeRevisionThreshold:
ret = time.Duration(int64(h))
case "":
return 0, errors.New("--auto-compaction-mode is undefined")
}

View File

@ -28,6 +28,7 @@ import (
const (
ModePeriodic = "periodic"
ModeRevision = "revision"
ModeRevisionThreshold = "revision-threshold"
)
// Compactor purges old log from the storage periodically.
@ -47,6 +48,11 @@ type Compactable interface {
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
}
type KvGetter interface {
RevGetter
CompactNotify() chan struct{}
}
type RevGetter interface {
Rev() int64
}
@ -56,17 +62,20 @@ func New(
lg *zap.Logger,
mode string,
retention time.Duration,
rg RevGetter,
kg KvGetter,
c Compactable,
) (Compactor, error) {
if lg == nil {
lg = zap.NewNop()
}
switch mode {
case ModePeriodic:
return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil
return newPeriodic(lg, clockwork.NewRealClock(), retention, kg, c), nil
case ModeRevision:
return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil
return newRevision(lg, clockwork.NewRealClock(), int64(retention), kg, c), nil
case ModeRevisionThreshold:
return newRevisionThreshold(lg, clockwork.NewRealClock(), kg, c), nil
default:
return nil, fmt.Errorf("unsupported compaction mode %s", mode)
}

View File

@ -0,0 +1,112 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v3compactor
import (
"context"
"sync"
"time"
"github.com/jonboulle/clockwork"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.uber.org/zap"
)
type RevisionThreshold struct {
lg *zap.Logger
clock clockwork.Clock
kv KvGetter
c Compactable
ctx context.Context
cancel context.CancelFunc
mu sync.Mutex
paused bool
}
func newRevisionThreshold(lg *zap.Logger, clock clockwork.Clock, kv KvGetter, c Compactable) *RevisionThreshold {
rc := &RevisionThreshold{
lg: lg,
clock: clock,
kv: kv,
c: c,
}
rc.ctx, rc.cancel = context.WithCancel(context.Background())
return rc
}
func (rc *RevisionThreshold) Run() {
go func() {
for {
select {
case <-rc.ctx.Done():
return
case <-rc.kv.CompactNotify():
rc.mu.Lock()
p := rc.paused
rc.mu.Unlock()
if p {
continue
}
}
rev := rc.kv.Rev()
now := time.Now()
rc.lg.Info(
"starting auto revision compaction",
zap.Int64("revision", rev),
)
_, err := rc.c.Compact(rc.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
rc.lg.Info(
"completed auto revision compaction",
zap.Int64("revision", rev),
zap.Duration("took", time.Since(now)),
)
} else {
rc.lg.Warn(
"failed auto revision compaction",
zap.Int64("revision", rev),
zap.Error(err),
)
}
}
}()
}
// Stop stops revision-based compactor.
func (rc *RevisionThreshold) Stop() {
rc.cancel()
}
// Pause pauses revision-based compactor.
func (rc *RevisionThreshold) Pause() {
rc.mu.Lock()
rc.paused = true
rc.mu.Unlock()
}
// Resume resumes revision-based compactor.
func (rc *RevisionThreshold) Resume() {
rc.mu.Lock()
rc.paused = false
rc.mu.Unlock()
}

View File

@ -370,6 +370,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
}
if cfg.AutoCompactionMode == v3compactor.ModeRevisionThreshold {
mvccStoreConfig.CompactionNotifyThreshold = int64(cfg.AutoCompactionRetention)
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())
@ -384,7 +388,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
}()
if num := cfg.AutoCompactionRetention; num != 0 {
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, any(srv.kv).(kvGetter), srv)
if err != nil {
return nil, err
}
@ -2497,3 +2501,8 @@ func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
func (s *EtcdServer) CorruptionChecker() CorruptionChecker {
return s.corruptionChecker
}
type kvGetter interface {
CompactNotify() chan struct{}
Rev() int64
}

View File

@ -45,6 +45,10 @@ var defaultCompactionSleepInterval = 10 * time.Millisecond
type StoreConfig struct {
CompactionBatchLimit int
CompactionSleepInterval time.Duration
// CompactionNotifyThreshold is used to guarantee that a notification
// is sent only after configured number of write transactions have
// occured since previsious compaction.
CompactionNotifyThreshold int64
}
type store struct {
@ -69,6 +73,9 @@ type store struct {
currentRev int64
// compactMainRev is the main revision of the last compaction.
compactMainRev int64
// compactNotifyCh is used to notify the compactor that it's time to
// compact.
compactNotifyCh chan struct{}
fifoSched schedule.Scheduler
@ -99,6 +106,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
currentRev: 1,
compactMainRev: -1,
compactNotifyCh: make(chan struct{}, 1),
fifoSched: schedule.NewFIFOScheduler(lg),
@ -501,6 +509,9 @@ func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, k
func (s *store) Close() error {
close(s.stopc)
if s.compactNotifyCh != nil {
close(s.compactNotifyCh)
}
s.fifoSched.Stop()
return nil
}
@ -535,3 +546,22 @@ func (s *store) setupMetricsReporter() {
func (s *store) HashStorage() HashStorage {
return s.hashes
}
func (s *store) CompactNotify() chan struct{} {
return s.compactNotifyCh
}
func (s *store) doCompactNotify() {
threshold := s.cfg.CompactionNotifyThreshold
if threshold <= 0 {
return
}
if s.currentRev-s.compactMainRev > threshold {
select {
case s.compactNotifyCh <- struct{}{}:
default:
}
}
}

View File

@ -185,6 +185,8 @@ func (tw *storeTxnWrite) End() {
// hold revMu lock to prevent new read txns from opening until writeback.
tw.s.revMu.Lock()
tw.s.currentRev++
tw.s.doCompactNotify()
}
tw.tx.Unlock()
if len(tw.changes) != 0 {

View File

@ -31,6 +31,10 @@ const (
// MaxQuotaBytes is the maximum number of bytes suggested for a backend
// quota. A larger quota may lead to degraded performance.
MaxQuotaBytes = int64(8 * 1024 * 1024 * 1024) // 8GB
// MaxAllowedOverflowQuotaBytes is the number of bytes the backend size
// can be overflow after exceeding the space quota.
MaxAllowedOverflowQuotaBytes = int64(1024 * 1024 * 1024) // 1GB
)
// Quota represents an arbitrary quota against arbitrary requests. Each request
@ -130,7 +134,17 @@ func (b *BackendQuota) Available(v any) bool {
return true
}
// TODO: maybe optimize Backend.Size()
return b.be.Size()+int64(cost) < b.maxBackendBytes
// Since the compact comes with allocatable pages, we should check the
// SizeInUse first. If there is no continuous pages for key/value and
// the boltdb continues to resize, it should not increase more than 1
// GiB. It's hard limitation.
//
// TODO: It should be enabled by flag.
if b.be.Size()+int64(cost)-b.maxBackendBytes >= maxAllowedOverflowBytes(b.maxBackendBytes) {
return false
}
return b.be.SizeInUse()+int64(cost) < b.maxBackendBytes
}
func (b *BackendQuota) Cost(v any) int {
@ -174,3 +188,11 @@ func costTxn(r *pb.TxnRequest) int {
func (b *BackendQuota) Remaining() int64 {
return b.maxBackendBytes - b.be.Size()
}
func maxAllowedOverflowBytes(maxBackendBytes int64) int64 {
allow := maxBackendBytes * 10 / 100
if allow > MaxAllowedOverflowQuotaBytes {
allow = MaxAllowedOverflowQuotaBytes
}
return allow
}

View File

@ -44,6 +44,7 @@ var putCmd = &cobra.Command{
var (
keySize int
valSize int
deltaValSize int
putTotal int
putRate int
@ -61,6 +62,7 @@ func init() {
RootCmd.AddCommand(putCmd)
putCmd.Flags().IntVar(&keySize, "key-size", 8, "Key size of put request")
putCmd.Flags().IntVar(&valSize, "val-size", 8, "Value size of put request")
putCmd.Flags().IntVar(&deltaValSize, "delta-val-size", 0, "Delta of value size of put request")
putCmd.Flags().IntVar(&putRate, "rate", 0, "Maximum puts per second (0 is no limit)")
putCmd.Flags().IntVar(&putTotal, "total", 10000, "Total number of put requests")
@ -83,7 +85,7 @@ func putFunc(cmd *cobra.Command, _ []string) {
}
limit := rate.NewLimiter(rate.Limit(putRate), 1)
clients := mustCreateClients(totalClients, totalConns)
k, v := make([]byte, keySize), string(mustRandBytes(valSize))
k, v := make([]byte, keySize), string(mustRandBytes(valSize+deltaValSize))
bar = pb.New(putTotal)
bar.Start()
@ -104,6 +106,7 @@ func putFunc(cmd *cobra.Command, _ []string) {
}(clients[i])
}
baseValSize := valSize - deltaValSize
go func() {
for i := 0; i < putTotal; i++ {
if seqKeys {
@ -111,7 +114,11 @@ func putFunc(cmd *cobra.Command, _ []string) {
} else {
binary.PutVarint(k, int64(rand.Intn(keySpaceSize)))
}
requests <- v3.OpPut(string(k), v)
deltaV := v
if deltaValSize > 0 {
deltaV = v[:baseValSize+rand.Intn(2*deltaValSize)]
}
requests <- v3.OpPut(string(k), deltaV)
}
close(requests)
}()

View File

@ -15,6 +15,7 @@
package cmd
import (
"math/rand"
"sync"
"time"
@ -58,6 +59,8 @@ var (
)
func init() {
rand.Seed(time.Now().UnixNano())
RootCmd.PersistentFlags().StringSliceVar(&endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
RootCmd.PersistentFlags().UintVar(&totalConns, "conns", 1, "Total number of gRPC connections")
RootCmd.PersistentFlags().UintVar(&totalClients, "clients", 1, "Total number of gRPC clients")

View File

@ -19,6 +19,7 @@ import (
"encoding/binary"
"fmt"
"math"
"math/rand"
"os"
"time"
@ -48,6 +49,7 @@ func init() {
RootCmd.AddCommand(txnPutCmd)
txnPutCmd.Flags().IntVar(&keySize, "key-size", 8, "Key size of txn put")
txnPutCmd.Flags().IntVar(&valSize, "val-size", 8, "Value size of txn put")
txnPutCmd.Flags().IntVar(&deltaValSize, "delta-val-size", 0, "Delta of value size of put request")
txnPutCmd.Flags().IntVar(&txnPutOpsPerTxn, "txn-ops", 1, "Number of puts per txn")
txnPutCmd.Flags().IntVar(&txnPutRate, "rate", 0, "Maximum txns per second (0 is no limit)")
@ -73,7 +75,7 @@ func txnPutFunc(_ *cobra.Command, _ []string) {
}
limit := rate.NewLimiter(rate.Limit(txnPutRate), 1)
clients := mustCreateClients(totalClients, totalConns)
k, v := make([]byte, keySize), string(mustRandBytes(valSize))
k, v := make([]byte, keySize), string(mustRandBytes(valSize+deltaValSize))
bar = pb.New(txnPutTotal)
bar.Start()
@ -93,12 +95,18 @@ func txnPutFunc(_ *cobra.Command, _ []string) {
}(clients[i])
}
baseValSize := valSize - deltaValSize
go func() {
for i := 0; i < txnPutTotal; i++ {
ops := make([]v3.Op, txnPutOpsPerTxn)
for j := 0; j < txnPutOpsPerTxn; j++ {
binary.PutVarint(k, int64(((i*txnPutOpsPerTxn)+j)%keySpaceSize))
ops[j] = v3.OpPut(string(k), v)
deltaV := v
if deltaValSize > 0 {
deltaV = v[:baseValSize+rand.Intn(2*deltaValSize)]
}
ops[j] = v3.OpPut(string(k), deltaV)
}
requests <- ops
}