mvcc: remove capnslog

This commit is contained in:
jingyih 2020-02-05 08:18:15 -08:00
parent 7395ed8e5d
commit 6b389bf23c
8 changed files with 149 additions and 315 deletions

View File

@ -25,7 +25,6 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/coreos/pkg/capnslog"
humanize "github.com/dustin/go-humanize" humanize "github.com/dustin/go-humanize"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
@ -42,8 +41,6 @@ var (
// This only works for linux. // This only works for linux.
initialMmapSize = uint64(10 * 1024 * 1024 * 1024) initialMmapSize = uint64(10 * 1024 * 1024 * 1024)
plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc/backend")
// minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning. // minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning.
minSnapshotWarningTimeout = 30 * time.Second minSnapshotWarningTimeout = 30 * time.Second
) )
@ -144,6 +141,10 @@ func NewDefaultBackend(path string) Backend {
} }
func newBackend(bcfg BackendConfig) *backend { func newBackend(bcfg BackendConfig) *backend {
if bcfg.Logger == nil {
bcfg.Logger = zap.NewNop()
}
bopts := &bolt.Options{} bopts := &bolt.Options{}
if boltOpenOptions != nil { if boltOpenOptions != nil {
*bopts = *boltOpenOptions *bopts = *boltOpenOptions
@ -153,11 +154,7 @@ func newBackend(bcfg BackendConfig) *backend {
db, err := bolt.Open(bcfg.Path, 0600, bopts) db, err := bolt.Open(bcfg.Path, 0600, bopts)
if err != nil { if err != nil {
if bcfg.Logger != nil {
bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err)) bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
} else {
plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
}
} }
// In future, may want to make buffering optional for low-concurrency systems // In future, may want to make buffering optional for low-concurrency systems
@ -225,11 +222,7 @@ func (b *backend) Snapshot() Snapshot {
defer b.mu.RUnlock() defer b.mu.RUnlock()
tx, err := b.db.Begin(false) tx, err := b.db.Begin(false)
if err != nil { if err != nil {
if b.lg != nil {
b.lg.Fatal("failed to begin tx", zap.Error(err)) b.lg.Fatal("failed to begin tx", zap.Error(err))
} else {
plog.Fatalf("cannot begin tx (%s)", err)
}
} }
stopc, donec := make(chan struct{}), make(chan struct{}) stopc, donec := make(chan struct{}), make(chan struct{})
@ -249,16 +242,12 @@ func (b *backend) Snapshot() Snapshot {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if b.lg != nil {
b.lg.Warn( b.lg.Warn(
"snapshotting taking too long to transfer", "snapshotting taking too long to transfer",
zap.Duration("taking", time.Since(start)), zap.Duration("taking", time.Since(start)),
zap.Int64("bytes", dbBytes), zap.Int64("bytes", dbBytes),
zap.String("size", humanize.Bytes(uint64(dbBytes))), zap.String("size", humanize.Bytes(uint64(dbBytes))),
) )
} else {
plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1024), start)
}
case <-stopc: case <-stopc:
snapshotTransferSec.Observe(time.Since(start).Seconds()) snapshotTransferSec.Observe(time.Since(start).Seconds())
@ -392,47 +381,27 @@ func (b *backend) defrag() error {
if err != nil { if err != nil {
tmpdb.Close() tmpdb.Close()
if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil { if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
if b.lg != nil {
b.lg.Error("failed to remove dirs under tmpdb", zap.Error(rmErr)) b.lg.Error("failed to remove dirs under tmpdb", zap.Error(rmErr))
} else {
plog.Errorf("failed to remove dirs under tmpdb (%s)", rmErr)
}
} }
return err return err
} }
err = b.db.Close() err = b.db.Close()
if err != nil { if err != nil {
if b.lg != nil {
b.lg.Fatal("failed to close database", zap.Error(err)) b.lg.Fatal("failed to close database", zap.Error(err))
} else {
plog.Fatalf("cannot close database (%s)", err)
}
} }
err = tmpdb.Close() err = tmpdb.Close()
if err != nil { if err != nil {
if b.lg != nil {
b.lg.Fatal("failed to close tmp database", zap.Error(err)) b.lg.Fatal("failed to close tmp database", zap.Error(err))
} else {
plog.Fatalf("cannot close database (%s)", err)
}
} }
err = os.Rename(tdbp, dbp) err = os.Rename(tdbp, dbp)
if err != nil { if err != nil {
if b.lg != nil {
b.lg.Fatal("failed to rename tmp database", zap.Error(err)) b.lg.Fatal("failed to rename tmp database", zap.Error(err))
} else {
plog.Fatalf("cannot rename database (%s)", err)
}
} }
b.db, err = bolt.Open(dbp, 0600, boltOpenOptions) b.db, err = bolt.Open(dbp, 0600, boltOpenOptions)
if err != nil { if err != nil {
if b.lg != nil {
b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err)) b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
} else {
plog.Panicf("cannot open database at %s (%v)", dbp, err)
}
} }
b.batchTx.tx = b.unsafeBegin(true) b.batchTx.tx = b.unsafeBegin(true)
@ -541,11 +510,7 @@ func (b *backend) begin(write bool) *bolt.Tx {
func (b *backend) unsafeBegin(write bool) *bolt.Tx { func (b *backend) unsafeBegin(write bool) *bolt.Tx {
tx, err := b.db.Begin(write) tx, err := b.db.Begin(write)
if err != nil { if err != nil {
if b.lg != nil {
b.lg.Fatal("failed to begin tx", zap.Error(err)) b.lg.Fatal("failed to begin tx", zap.Error(err))
} else {
plog.Fatalf("cannot begin tx (%s)", err)
}
} }
return tx return tx
} }

View File

@ -71,15 +71,11 @@ func (t *batchTx) RUnlock() {
func (t *batchTx) UnsafeCreateBucket(name []byte) { func (t *batchTx) UnsafeCreateBucket(name []byte) {
_, err := t.tx.CreateBucket(name) _, err := t.tx.CreateBucket(name)
if err != nil && err != bolt.ErrBucketExists { if err != nil && err != bolt.ErrBucketExists {
if t.backend.lg != nil {
t.backend.lg.Fatal( t.backend.lg.Fatal(
"failed to create a bucket", "failed to create a bucket",
zap.String("bucket-name", string(name)), zap.String("bucket-name", string(name)),
zap.Error(err), zap.Error(err),
) )
} else {
plog.Fatalf("cannot create bucket %s (%v)", name, err)
}
} }
t.pending++ t.pending++
} }
@ -97,14 +93,10 @@ func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) { func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketName) bucket := t.tx.Bucket(bucketName)
if bucket == nil { if bucket == nil {
if t.backend.lg != nil {
t.backend.lg.Fatal( t.backend.lg.Fatal(
"failed to find a bucket", "failed to find a bucket",
zap.String("bucket-name", string(bucketName)), zap.String("bucket-name", string(bucketName)),
) )
} else {
plog.Fatalf("bucket %s does not exist", bucketName)
}
} }
if seq { if seq {
// it is useful to increase fill percent when the workloads are mostly append-only. // it is useful to increase fill percent when the workloads are mostly append-only.
@ -112,15 +104,11 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
bucket.FillPercent = 0.9 bucket.FillPercent = 0.9
} }
if err := bucket.Put(key, value); err != nil { if err := bucket.Put(key, value); err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal( t.backend.lg.Fatal(
"failed to write to a bucket", "failed to write to a bucket",
zap.String("bucket-name", string(bucketName)), zap.String("bucket-name", string(bucketName)),
zap.Error(err), zap.Error(err),
) )
} else {
plog.Fatalf("cannot put key into bucket (%v)", err)
}
} }
t.pending++ t.pending++
} }
@ -129,14 +117,10 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
bucket := t.tx.Bucket(bucketName) bucket := t.tx.Bucket(bucketName)
if bucket == nil { if bucket == nil {
if t.backend.lg != nil {
t.backend.lg.Fatal( t.backend.lg.Fatal(
"failed to find a bucket", "failed to find a bucket",
zap.String("bucket-name", string(bucketName)), zap.String("bucket-name", string(bucketName)),
) )
} else {
plog.Fatalf("bucket %s does not exist", bucketName)
}
} }
return unsafeRange(bucket.Cursor(), key, endKey, limit) return unsafeRange(bucket.Cursor(), key, endKey, limit)
} }
@ -167,26 +151,18 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
bucket := t.tx.Bucket(bucketName) bucket := t.tx.Bucket(bucketName)
if bucket == nil { if bucket == nil {
if t.backend.lg != nil {
t.backend.lg.Fatal( t.backend.lg.Fatal(
"failed to find a bucket", "failed to find a bucket",
zap.String("bucket-name", string(bucketName)), zap.String("bucket-name", string(bucketName)),
) )
} else {
plog.Fatalf("bucket %s does not exist", bucketName)
}
} }
err := bucket.Delete(key) err := bucket.Delete(key)
if err != nil { if err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal( t.backend.lg.Fatal(
"failed to delete a key", "failed to delete a key",
zap.String("bucket-name", string(bucketName)), zap.String("bucket-name", string(bucketName)),
zap.Error(err), zap.Error(err),
) )
} else {
plog.Fatalf("cannot delete key from bucket (%v)", err)
}
} }
t.pending++ t.pending++
} }
@ -244,11 +220,7 @@ func (t *batchTx) commit(stop bool) {
t.pending = 0 t.pending = 0
if err != nil { if err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal("failed to commit tx", zap.Error(err)) t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
} else {
plog.Fatalf("cannot commit tx (%s)", err)
}
} }
} }
if !stop { if !stop {
@ -311,11 +283,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
go func(tx *bolt.Tx, wg *sync.WaitGroup) { go func(tx *bolt.Tx, wg *sync.WaitGroup) {
wg.Wait() wg.Wait()
if err := tx.Rollback(); err != nil { if err := tx.Rollback(); err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err)) t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
}
} }
}(t.backend.readTx.tx, t.backend.readTx.txWg) }(t.backend.readTx.tx, t.backend.readTx.txWg)
t.backend.readTx.reset() t.backend.readTx.reset()

View File

@ -185,11 +185,7 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{}) available := make(map[revision]struct{})
if ti.lg != nil {
ti.lg.Info("compact tree index", zap.Int64("revision", rev)) ti.lg.Info("compact tree index", zap.Int64("revision", rev))
} else {
plog.Printf("store.index: compact %d", rev)
}
ti.Lock() ti.Lock()
clone := ti.tree.Clone() clone := ti.tree.Clone()
ti.Unlock() ti.Unlock()
@ -203,11 +199,7 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
if keyi.isEmpty() { if keyi.isEmpty() {
item := ti.tree.Delete(keyi) item := ti.tree.Delete(keyi)
if item == nil { if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction") ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
}
} }
} }
ti.Unlock() ti.Unlock()

View File

@ -78,7 +78,6 @@ func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
rev := revision{main: main, sub: sub} rev := revision{main: main, sub: sub}
if !rev.GreaterThan(ki.modified) { if !rev.GreaterThan(ki.modified) {
if lg != nil {
lg.Panic( lg.Panic(
"'put' with an unexpected smaller revision", "'put' with an unexpected smaller revision",
zap.Int64("given-revision-main", rev.main), zap.Int64("given-revision-main", rev.main),
@ -86,9 +85,6 @@ func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
zap.Int64("modified-revision-main", ki.modified.main), zap.Int64("modified-revision-main", ki.modified.main),
zap.Int64("modified-revision-sub", ki.modified.sub), zap.Int64("modified-revision-sub", ki.modified.sub),
) )
} else {
plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
}
} }
if len(ki.generations) == 0 { if len(ki.generations) == 0 {
ki.generations = append(ki.generations, generation{}) ki.generations = append(ki.generations, generation{})
@ -105,14 +101,10 @@ func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) { func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {
if len(ki.generations) != 0 { if len(ki.generations) != 0 {
if lg != nil {
lg.Panic( lg.Panic(
"'restore' got an unexpected non-empty generations", "'restore' got an unexpected non-empty generations",
zap.Int("generations-size", len(ki.generations)), zap.Int("generations-size", len(ki.generations)),
) )
} else {
plog.Panicf("store.keyindex: cannot restore non-empty keyIndex")
}
} }
ki.modified = modified ki.modified = modified
@ -126,14 +118,10 @@ func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int6
// It returns ErrRevisionNotFound when tombstone on an empty generation. // It returns ErrRevisionNotFound when tombstone on an empty generation.
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error { func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
if ki.isEmpty() { if ki.isEmpty() {
if lg != nil {
lg.Panic( lg.Panic(
"'tombstone' got an unexpected empty keyIndex", "'tombstone' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)), zap.String("key", string(ki.key)),
) )
} else {
plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
}
} }
if ki.generations[len(ki.generations)-1].isEmpty() { if ki.generations[len(ki.generations)-1].isEmpty() {
return ErrRevisionNotFound return ErrRevisionNotFound
@ -148,14 +136,10 @@ func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
// Rev must be higher than or equal to the given atRev. // Rev must be higher than or equal to the given atRev.
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) { func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
if ki.isEmpty() { if ki.isEmpty() {
if lg != nil {
lg.Panic( lg.Panic(
"'get' got an unexpected empty keyIndex", "'get' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)), zap.String("key", string(ki.key)),
) )
} else {
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
}
} }
g := ki.findGeneration(atRev) g := ki.findGeneration(atRev)
if g.isEmpty() { if g.isEmpty() {
@ -175,14 +159,10 @@ func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision
// main revision. // main revision.
func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision { func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
if ki.isEmpty() { if ki.isEmpty() {
if lg != nil {
lg.Panic( lg.Panic(
"'since' got an unexpected empty keyIndex", "'since' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)), zap.String("key", string(ki.key)),
) )
} else {
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
}
} }
since := revision{rev, 0} since := revision{rev, 0}
var gi int var gi int
@ -223,14 +203,10 @@ func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
// If a generation becomes empty during compaction, it will be removed. // If a generation becomes empty during compaction, it will be removed.
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) { func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
if ki.isEmpty() { if ki.isEmpty() {
if lg != nil {
lg.Panic( lg.Panic(
"'compact' got an unexpected empty keyIndex", "'compact' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)), zap.String("key", string(ki.key)),
) )
} else {
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
}
} }
genIdx, revIndex := ki.doCompact(atRev, available) genIdx, revIndex := ki.doCompact(atRev, available)

View File

@ -31,7 +31,6 @@ import (
"go.etcd.io/etcd/pkg/schedule" "go.etcd.io/etcd/pkg/schedule"
"go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/traceutil"
"github.com/coreos/pkg/capnslog"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -46,8 +45,6 @@ var (
ErrCompacted = errors.New("mvcc: required revision has been compacted") ErrCompacted = errors.New("mvcc: required revision has been compacted")
ErrFutureRev = errors.New("mvcc: required revision is a future revision") ErrFutureRev = errors.New("mvcc: required revision is a future revision")
ErrCanceled = errors.New("mvcc: watcher is canceled") ErrCanceled = errors.New("mvcc: watcher is canceled")
plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc")
) )
const ( const (
@ -116,6 +113,9 @@ type store struct {
// NewStore returns a new store. It is useful to create a store inside // NewStore returns a new store. It is useful to create a store inside
// mvcc pkg. It should only be used for testing externally. // mvcc pkg. It should only be used for testing externally.
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store { func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store {
if lg == nil {
lg = zap.NewNop()
}
if cfg.CompactionBatchLimit == 0 { if cfg.CompactionBatchLimit == 0 {
cfg.CompactionBatchLimit = defaultCompactBatchLimit cfg.CompactionBatchLimit = defaultCompactBatchLimit
} }
@ -372,16 +372,12 @@ func (s *store) restore() error {
if len(finishedCompactBytes) != 0 { if len(finishedCompactBytes) != 0 {
s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
if s.lg != nil {
s.lg.Info( s.lg.Info(
"restored last compact revision", "restored last compact revision",
zap.String("meta-bucket-name", string(metaBucketName)), zap.String("meta-bucket-name", string(metaBucketName)),
zap.String("meta-bucket-name-key", string(finishedCompactKeyName)), zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
zap.Int64("restored-compact-revision", s.compactMainRev), zap.Int64("restored-compact-revision", s.compactMainRev),
) )
} else {
plog.Printf("restore compact to %d", s.compactMainRev)
}
} }
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0) scheduledCompact := int64(0)
@ -429,15 +425,11 @@ func (s *store) restore() error {
} }
err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}}) err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
if err != nil { if err != nil {
if s.lg != nil { s.lg.Error(
s.lg.Warn(
"failed to attach a lease", "failed to attach a lease",
zap.String("lease-id", fmt.Sprintf("%016x", lid)), zap.String("lease-id", fmt.Sprintf("%016x", lid)),
zap.Error(err), zap.Error(err),
) )
} else {
plog.Errorf("unexpected Attach error: %v", err)
}
} }
} }
@ -445,19 +437,15 @@ func (s *store) restore() error {
if scheduledCompact != 0 { if scheduledCompact != 0 {
if _, err := s.compactLockfree(scheduledCompact); err != nil { if _, err := s.compactLockfree(scheduledCompact); err != nil {
plog.Warningf("compaction encountered: %v", err) s.lg.Warn("compaction encountered error", zap.Error(err))
} }
if s.lg != nil {
s.lg.Info( s.lg.Info(
"resume scheduled compaction", "resume scheduled compaction",
zap.String("meta-bucket-name", string(metaBucketName)), zap.String("meta-bucket-name", string(metaBucketName)),
zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)), zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
zap.Int64("scheduled-compact-revision", scheduledCompact), zap.Int64("scheduled-compact-revision", scheduledCompact),
) )
} else {
plog.Printf("resume scheduled compaction at %d", scheduledCompact)
}
} }
return nil return nil
@ -501,7 +489,7 @@ func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int
if ok { if ok {
if isTombstone(rkv.key) { if isTombstone(rkv.key) {
if err := ki.tombstone(lg, rev.main, rev.sub); err != nil { if err := ki.tombstone(lg, rev.main, rev.sub); err != nil {
plog.Warningf("tombstone encountered: %v", err) lg.Warn("tombstone encountered error", zap.Error(err))
} }
continue continue
} }
@ -520,11 +508,7 @@ func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, k
for i, key := range keys { for i, key := range keys {
rkv := revKeyValue{key: key} rkv := revKeyValue{key: key}
if err := rkv.kv.Unmarshal(vals[i]); err != nil { if err := rkv.kv.Unmarshal(vals[i]); err != nil {
if lg != nil {
lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
} else {
plog.Fatalf("cannot unmarshal event: %v", err)
}
} }
rkv.kstr = string(rkv.kv.Key) rkv.kstr = string(rkv.kv.Key)
if isTombstone(key) { if isTombstone(key) {
@ -606,15 +590,11 @@ func (s *store) setupMetricsReporter() {
// appendMarkTombstone appends tombstone mark to normal revision bytes. // appendMarkTombstone appends tombstone mark to normal revision bytes.
func appendMarkTombstone(lg *zap.Logger, b []byte) []byte { func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
if len(b) != revBytesLen { if len(b) != revBytesLen {
if lg != nil {
lg.Panic( lg.Panic(
"cannot append tombstone mark to non-normal revision bytes", "cannot append tombstone mark to non-normal revision bytes",
zap.Int("expected-revision-bytes-size", revBytesLen), zap.Int("expected-revision-bytes-size", revBytesLen),
zap.Int("given-revision-bytes-size", len(b)), zap.Int("given-revision-bytes-size", len(b)),
) )
} else {
plog.Panicf("cannot append mark to non normal revision bytes")
}
} }
return append(b, markTombstone) return append(b, markTombstone)
} }

View File

@ -52,15 +52,11 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
revToBytes(revision{main: compactMainRev}, rbytes) revToBytes(revision{main: compactMainRev}, rbytes)
tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes) tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
tx.Unlock() tx.Unlock()
if s.lg != nil {
s.lg.Info( s.lg.Info(
"finished scheduled compaction", "finished scheduled compaction",
zap.Int64("compact-revision", compactMainRev), zap.Int64("compact-revision", compactMainRev),
zap.Duration("took", time.Since(totalStart)), zap.Duration("took", time.Since(totalStart)),
) )
} else {
plog.Infof("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
}
return true return true
} }

View File

@ -146,25 +146,17 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
revToBytes(revpair, revBytes) revToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0) _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
if len(vs) != 1 { if len(vs) != 1 {
if tr.s.lg != nil {
tr.s.lg.Fatal( tr.s.lg.Fatal(
"range failed to find revision pair", "range failed to find revision pair",
zap.Int64("revision-main", revpair.main), zap.Int64("revision-main", revpair.main),
zap.Int64("revision-sub", revpair.sub), zap.Int64("revision-sub", revpair.sub),
) )
} else {
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
}
} }
if err := kvs[i].Unmarshal(vs[0]); err != nil { if err := kvs[i].Unmarshal(vs[0]); err != nil {
if tr.s.lg != nil {
tr.s.lg.Fatal( tr.s.lg.Fatal(
"failed to unmarshal mvccpb.KeyValue", "failed to unmarshal mvccpb.KeyValue",
zap.Error(err), zap.Error(err),
) )
} else {
plog.Fatalf("cannot unmarshal event: %v", err)
}
} }
} }
tr.trace.Step("range keys from bolt db") tr.trace.Step("range keys from bolt db")
@ -200,14 +192,10 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
d, err := kv.Marshal() d, err := kv.Marshal()
if err != nil { if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal( tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue", "failed to marshal mvccpb.KeyValue",
zap.Error(err), zap.Error(err),
) )
} else {
plog.Fatalf("cannot marshal event: %v", err)
}
} }
tw.trace.Step("marshal mvccpb.KeyValue") tw.trace.Step("marshal mvccpb.KeyValue")
@ -222,14 +210,10 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
} }
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil { if err != nil {
if tw.storeTxnRead.s.lg != nil { tw.storeTxnRead.s.lg.Error(
tw.storeTxnRead.s.lg.Fatal(
"failed to detach old lease from a key", "failed to detach old lease from a key",
zap.Error(err), zap.Error(err),
) )
} else {
plog.Errorf("unexpected error from lease detach: %v", err)
}
} }
} }
if leaseID != lease.NoLease { if leaseID != lease.NoLease {
@ -264,39 +248,26 @@ func (tw *storeTxnWrite) delete(key []byte) {
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))} idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes) revToBytes(idxRev, ibytes)
if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil {
ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes) ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
} else {
// TODO: remove this in v3.5
ibytes = appendMarkTombstone(nil, ibytes)
}
kv := mvccpb.KeyValue{Key: key} kv := mvccpb.KeyValue{Key: key}
d, err := kv.Marshal() d, err := kv.Marshal()
if err != nil { if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal( tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue", "failed to marshal mvccpb.KeyValue",
zap.Error(err), zap.Error(err),
) )
} else {
plog.Fatalf("cannot marshal event: %v", err)
}
} }
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
err = tw.s.kvindex.Tombstone(key, idxRev) err = tw.s.kvindex.Tombstone(key, idxRev)
if err != nil { if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal( tw.storeTxnRead.s.lg.Fatal(
"failed to tombstone an existing key", "failed to tombstone an existing key",
zap.String("key", string(key)), zap.String("key", string(key)),
zap.Error(err), zap.Error(err),
) )
} else {
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
}
} }
tw.changes = append(tw.changes, kv) tw.changes = append(tw.changes, kv)
@ -306,14 +277,10 @@ func (tw *storeTxnWrite) delete(key []byte) {
if leaseID != lease.NoLease { if leaseID != lease.NoLease {
err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item}) err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
if err != nil { if err != nil {
if tw.storeTxnRead.s.lg != nil { tw.storeTxnRead.s.lg.Error(
tw.storeTxnRead.s.lg.Fatal(
"failed to detach old lease from a key", "failed to detach old lease from a key",
zap.Error(err), zap.Error(err),
) )
} else {
plog.Errorf("cannot detach %v", err)
}
} }
} }
} }

View File

@ -74,6 +74,9 @@ func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexG
} }
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore { func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
if lg == nil {
lg = zap.NewNop()
}
s := &watchableStore{ s := &watchableStore{
store: NewStore(lg, b, le, ig, cfg), store: NewStore(lg, b, le, ig, cfg),
victimc: make(chan struct{}, 1), victimc: make(chan struct{}, 1),
@ -351,12 +354,7 @@ func (s *watchableStore) syncWatchers() int {
tx.RLock() tx.RLock()
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
var evs []mvccpb.Event var evs []mvccpb.Event
if s.store != nil && s.store.lg != nil {
evs = kvsToEvents(s.store.lg, wg, revs, vs) evs = kvsToEvents(s.store.lg, wg, revs, vs)
} else {
// TODO: remove this in v3.5
evs = kvsToEvents(nil, wg, revs, vs)
}
tx.RUnlock() tx.RUnlock()
var victims watcherBatch var victims watcherBatch
@ -412,11 +410,7 @@ func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []m
for i, v := range vals { for i, v := range vals {
var kv mvccpb.KeyValue var kv mvccpb.KeyValue
if err := kv.Unmarshal(v); err != nil { if err := kv.Unmarshal(v); err != nil {
if lg != nil {
lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
} else {
plog.Panicf("cannot unmarshal event: %v", err)
}
} }
if !wg.contains(string(kv.Key)) { if !wg.contains(string(kv.Key)) {
@ -440,14 +434,10 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
var victim watcherBatch var victim watcherBatch
for w, eb := range newWatcherBatch(&s.synced, evs) { for w, eb := range newWatcherBatch(&s.synced, evs) {
if eb.revs != 1 { if eb.revs != 1 {
if s.store != nil && s.store.lg != nil {
s.store.lg.Panic( s.store.lg.Panic(
"unexpected multiple revisions in watch notification", "unexpected multiple revisions in watch notification",
zap.Int("number-of-revisions", eb.revs), zap.Int("number-of-revisions", eb.revs),
) )
} else {
plog.Panicf("unexpected multiple revisions in notification")
}
} }
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) { if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
pendingEventsGauge.Add(float64(len(eb.evs))) pendingEventsGauge.Add(float64(len(eb.evs)))