diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index e4a55aefb..63a4a2e99 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -25,7 +25,6 @@ import ( "sync/atomic" "time" - "github.com/coreos/pkg/capnslog" humanize "github.com/dustin/go-humanize" bolt "go.etcd.io/bbolt" "go.uber.org/zap" @@ -42,8 +41,6 @@ var ( // This only works for linux. initialMmapSize = uint64(10 * 1024 * 1024 * 1024) - plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc/backend") - // minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning. minSnapshotWarningTimeout = 30 * time.Second ) @@ -144,6 +141,10 @@ func NewDefaultBackend(path string) Backend { } func newBackend(bcfg BackendConfig) *backend { + if bcfg.Logger == nil { + bcfg.Logger = zap.NewNop() + } + bopts := &bolt.Options{} if boltOpenOptions != nil { *bopts = *boltOpenOptions @@ -153,11 +154,7 @@ func newBackend(bcfg BackendConfig) *backend { db, err := bolt.Open(bcfg.Path, 0600, bopts) if err != nil { - if bcfg.Logger != nil { - bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err)) - } else { - plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err) - } + bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err)) } // In future, may want to make buffering optional for low-concurrency systems @@ -225,11 +222,7 @@ func (b *backend) Snapshot() Snapshot { defer b.mu.RUnlock() tx, err := b.db.Begin(false) if err != nil { - if b.lg != nil { - b.lg.Fatal("failed to begin tx", zap.Error(err)) - } else { - plog.Fatalf("cannot begin tx (%s)", err) - } + b.lg.Fatal("failed to begin tx", zap.Error(err)) } stopc, donec := make(chan struct{}), make(chan struct{}) @@ -249,16 +242,12 @@ func (b *backend) Snapshot() Snapshot { for { select { case <-ticker.C: - if b.lg != nil { - b.lg.Warn( - "snapshotting taking too long to transfer", - zap.Duration("taking", time.Since(start)), - zap.Int64("bytes", dbBytes), - zap.String("size", humanize.Bytes(uint64(dbBytes))), - ) - } else { - plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1024), start) - } + b.lg.Warn( + "snapshotting taking too long to transfer", + zap.Duration("taking", time.Since(start)), + zap.Int64("bytes", dbBytes), + zap.String("size", humanize.Bytes(uint64(dbBytes))), + ) case <-stopc: snapshotTransferSec.Observe(time.Since(start).Seconds()) @@ -392,47 +381,27 @@ func (b *backend) defrag() error { if err != nil { tmpdb.Close() if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil { - if b.lg != nil { - b.lg.Error("failed to remove dirs under tmpdb", zap.Error(rmErr)) - } else { - plog.Errorf("failed to remove dirs under tmpdb (%s)", rmErr) - } + b.lg.Error("failed to remove dirs under tmpdb", zap.Error(rmErr)) } return err } err = b.db.Close() if err != nil { - if b.lg != nil { - b.lg.Fatal("failed to close database", zap.Error(err)) - } else { - plog.Fatalf("cannot close database (%s)", err) - } + b.lg.Fatal("failed to close database", zap.Error(err)) } err = tmpdb.Close() if err != nil { - if b.lg != nil { - b.lg.Fatal("failed to close tmp database", zap.Error(err)) - } else { - plog.Fatalf("cannot close database (%s)", err) - } + b.lg.Fatal("failed to close tmp database", zap.Error(err)) } err = os.Rename(tdbp, dbp) if err != nil { - if b.lg != nil { - b.lg.Fatal("failed to rename tmp database", zap.Error(err)) - } else { - plog.Fatalf("cannot rename database (%s)", err) - } + b.lg.Fatal("failed to rename tmp database", zap.Error(err)) } b.db, err = bolt.Open(dbp, 0600, boltOpenOptions) if err != nil { - if b.lg != nil { - b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err)) - } else { - plog.Panicf("cannot open database at %s (%v)", dbp, err) - } + b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err)) } b.batchTx.tx = b.unsafeBegin(true) @@ -541,11 +510,7 @@ func (b *backend) begin(write bool) *bolt.Tx { func (b *backend) unsafeBegin(write bool) *bolt.Tx { tx, err := b.db.Begin(write) if err != nil { - if b.lg != nil { - b.lg.Fatal("failed to begin tx", zap.Error(err)) - } else { - plog.Fatalf("cannot begin tx (%s)", err) - } + b.lg.Fatal("failed to begin tx", zap.Error(err)) } return tx } diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index d5c8a88c3..eb75c29fc 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -71,15 +71,11 @@ func (t *batchTx) RUnlock() { func (t *batchTx) UnsafeCreateBucket(name []byte) { _, err := t.tx.CreateBucket(name) if err != nil && err != bolt.ErrBucketExists { - if t.backend.lg != nil { - t.backend.lg.Fatal( - "failed to create a bucket", - zap.String("bucket-name", string(name)), - zap.Error(err), - ) - } else { - plog.Fatalf("cannot create bucket %s (%v)", name, err) - } + t.backend.lg.Fatal( + "failed to create a bucket", + zap.String("bucket-name", string(name)), + zap.Error(err), + ) } t.pending++ } @@ -97,14 +93,10 @@ func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) { func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) { bucket := t.tx.Bucket(bucketName) if bucket == nil { - if t.backend.lg != nil { - t.backend.lg.Fatal( - "failed to find a bucket", - zap.String("bucket-name", string(bucketName)), - ) - } else { - plog.Fatalf("bucket %s does not exist", bucketName) - } + t.backend.lg.Fatal( + "failed to find a bucket", + zap.String("bucket-name", string(bucketName)), + ) } if seq { // it is useful to increase fill percent when the workloads are mostly append-only. @@ -112,15 +104,11 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo bucket.FillPercent = 0.9 } if err := bucket.Put(key, value); err != nil { - if t.backend.lg != nil { - t.backend.lg.Fatal( - "failed to write to a bucket", - zap.String("bucket-name", string(bucketName)), - zap.Error(err), - ) - } else { - plog.Fatalf("cannot put key into bucket (%v)", err) - } + t.backend.lg.Fatal( + "failed to write to a bucket", + zap.String("bucket-name", string(bucketName)), + zap.Error(err), + ) } t.pending++ } @@ -129,14 +117,10 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { bucket := t.tx.Bucket(bucketName) if bucket == nil { - if t.backend.lg != nil { - t.backend.lg.Fatal( - "failed to find a bucket", - zap.String("bucket-name", string(bucketName)), - ) - } else { - plog.Fatalf("bucket %s does not exist", bucketName) - } + t.backend.lg.Fatal( + "failed to find a bucket", + zap.String("bucket-name", string(bucketName)), + ) } return unsafeRange(bucket.Cursor(), key, endKey, limit) } @@ -167,26 +151,18 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { bucket := t.tx.Bucket(bucketName) if bucket == nil { - if t.backend.lg != nil { - t.backend.lg.Fatal( - "failed to find a bucket", - zap.String("bucket-name", string(bucketName)), - ) - } else { - plog.Fatalf("bucket %s does not exist", bucketName) - } + t.backend.lg.Fatal( + "failed to find a bucket", + zap.String("bucket-name", string(bucketName)), + ) } err := bucket.Delete(key) if err != nil { - if t.backend.lg != nil { - t.backend.lg.Fatal( - "failed to delete a key", - zap.String("bucket-name", string(bucketName)), - zap.Error(err), - ) - } else { - plog.Fatalf("cannot delete key from bucket (%v)", err) - } + t.backend.lg.Fatal( + "failed to delete a key", + zap.String("bucket-name", string(bucketName)), + zap.Error(err), + ) } t.pending++ } @@ -244,11 +220,7 @@ func (t *batchTx) commit(stop bool) { t.pending = 0 if err != nil { - if t.backend.lg != nil { - t.backend.lg.Fatal("failed to commit tx", zap.Error(err)) - } else { - plog.Fatalf("cannot commit tx (%s)", err) - } + t.backend.lg.Fatal("failed to commit tx", zap.Error(err)) } } if !stop { @@ -311,11 +283,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) { go func(tx *bolt.Tx, wg *sync.WaitGroup) { wg.Wait() if err := tx.Rollback(); err != nil { - if t.backend.lg != nil { - t.backend.lg.Fatal("failed to rollback tx", zap.Error(err)) - } else { - plog.Fatalf("cannot rollback tx (%s)", err) - } + t.backend.lg.Fatal("failed to rollback tx", zap.Error(err)) } }(t.backend.readTx.tx, t.backend.readTx.txWg) t.backend.readTx.reset() diff --git a/mvcc/index.go b/mvcc/index.go index f8cc6df88..c9b0d1831 100644 --- a/mvcc/index.go +++ b/mvcc/index.go @@ -185,11 +185,7 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision { func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { available := make(map[revision]struct{}) - if ti.lg != nil { - ti.lg.Info("compact tree index", zap.Int64("revision", rev)) - } else { - plog.Printf("store.index: compact %d", rev) - } + ti.lg.Info("compact tree index", zap.Int64("revision", rev)) ti.Lock() clone := ti.tree.Clone() ti.Unlock() @@ -203,11 +199,7 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { if keyi.isEmpty() { item := ti.tree.Delete(keyi) if item == nil { - if ti.lg != nil { - ti.lg.Panic("failed to delete during compaction") - } else { - plog.Panic("store.index: unexpected delete failure during compaction") - } + ti.lg.Panic("failed to delete during compaction") } } ti.Unlock() diff --git a/mvcc/key_index.go b/mvcc/key_index.go index cf77cb438..58ad4832e 100644 --- a/mvcc/key_index.go +++ b/mvcc/key_index.go @@ -78,17 +78,13 @@ func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) { rev := revision{main: main, sub: sub} if !rev.GreaterThan(ki.modified) { - if lg != nil { - lg.Panic( - "'put' with an unexpected smaller revision", - zap.Int64("given-revision-main", rev.main), - zap.Int64("given-revision-sub", rev.sub), - zap.Int64("modified-revision-main", ki.modified.main), - zap.Int64("modified-revision-sub", ki.modified.sub), - ) - } else { - plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified) - } + lg.Panic( + "'put' with an unexpected smaller revision", + zap.Int64("given-revision-main", rev.main), + zap.Int64("given-revision-sub", rev.sub), + zap.Int64("modified-revision-main", ki.modified.main), + zap.Int64("modified-revision-sub", ki.modified.sub), + ) } if len(ki.generations) == 0 { ki.generations = append(ki.generations, generation{}) @@ -105,14 +101,10 @@ func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) { func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) { if len(ki.generations) != 0 { - if lg != nil { - lg.Panic( - "'restore' got an unexpected non-empty generations", - zap.Int("generations-size", len(ki.generations)), - ) - } else { - plog.Panicf("store.keyindex: cannot restore non-empty keyIndex") - } + lg.Panic( + "'restore' got an unexpected non-empty generations", + zap.Int("generations-size", len(ki.generations)), + ) } ki.modified = modified @@ -126,14 +118,10 @@ func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int6 // It returns ErrRevisionNotFound when tombstone on an empty generation. func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error { if ki.isEmpty() { - if lg != nil { - lg.Panic( - "'tombstone' got an unexpected empty keyIndex", - zap.String("key", string(ki.key)), - ) - } else { - plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key)) - } + lg.Panic( + "'tombstone' got an unexpected empty keyIndex", + zap.String("key", string(ki.key)), + ) } if ki.generations[len(ki.generations)-1].isEmpty() { return ErrRevisionNotFound @@ -148,14 +136,10 @@ func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error { // Rev must be higher than or equal to the given atRev. func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) { if ki.isEmpty() { - if lg != nil { - lg.Panic( - "'get' got an unexpected empty keyIndex", - zap.String("key", string(ki.key)), - ) - } else { - plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key)) - } + lg.Panic( + "'get' got an unexpected empty keyIndex", + zap.String("key", string(ki.key)), + ) } g := ki.findGeneration(atRev) if g.isEmpty() { @@ -175,14 +159,10 @@ func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision // main revision. func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision { if ki.isEmpty() { - if lg != nil { - lg.Panic( - "'since' got an unexpected empty keyIndex", - zap.String("key", string(ki.key)), - ) - } else { - plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key)) - } + lg.Panic( + "'since' got an unexpected empty keyIndex", + zap.String("key", string(ki.key)), + ) } since := revision{rev, 0} var gi int @@ -223,14 +203,10 @@ func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision { // If a generation becomes empty during compaction, it will be removed. func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) { if ki.isEmpty() { - if lg != nil { - lg.Panic( - "'compact' got an unexpected empty keyIndex", - zap.String("key", string(ki.key)), - ) - } else { - plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key)) - } + lg.Panic( + "'compact' got an unexpected empty keyIndex", + zap.String("key", string(ki.key)), + ) } genIdx, revIndex := ki.doCompact(atRev, available) diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 2bd28b0b5..7d3a54e33 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -31,7 +31,6 @@ import ( "go.etcd.io/etcd/pkg/schedule" "go.etcd.io/etcd/pkg/traceutil" - "github.com/coreos/pkg/capnslog" "go.uber.org/zap" ) @@ -46,8 +45,6 @@ var ( ErrCompacted = errors.New("mvcc: required revision has been compacted") ErrFutureRev = errors.New("mvcc: required revision is a future revision") ErrCanceled = errors.New("mvcc: watcher is canceled") - - plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc") ) const ( @@ -116,6 +113,9 @@ type store struct { // NewStore returns a new store. It is useful to create a store inside // mvcc pkg. It should only be used for testing externally. func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store { + if lg == nil { + lg = zap.NewNop() + } if cfg.CompactionBatchLimit == 0 { cfg.CompactionBatchLimit = defaultCompactBatchLimit } @@ -372,16 +372,12 @@ func (s *store) restore() error { if len(finishedCompactBytes) != 0 { s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main - if s.lg != nil { - s.lg.Info( - "restored last compact revision", - zap.String("meta-bucket-name", string(metaBucketName)), - zap.String("meta-bucket-name-key", string(finishedCompactKeyName)), - zap.Int64("restored-compact-revision", s.compactMainRev), - ) - } else { - plog.Printf("restore compact to %d", s.compactMainRev) - } + s.lg.Info( + "restored last compact revision", + zap.String("meta-bucket-name", string(metaBucketName)), + zap.String("meta-bucket-name-key", string(finishedCompactKeyName)), + zap.Int64("restored-compact-revision", s.compactMainRev), + ) } _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) scheduledCompact := int64(0) @@ -429,15 +425,11 @@ func (s *store) restore() error { } err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}}) if err != nil { - if s.lg != nil { - s.lg.Warn( - "failed to attach a lease", - zap.String("lease-id", fmt.Sprintf("%016x", lid)), - zap.Error(err), - ) - } else { - plog.Errorf("unexpected Attach error: %v", err) - } + s.lg.Error( + "failed to attach a lease", + zap.String("lease-id", fmt.Sprintf("%016x", lid)), + zap.Error(err), + ) } } @@ -445,19 +437,15 @@ func (s *store) restore() error { if scheduledCompact != 0 { if _, err := s.compactLockfree(scheduledCompact); err != nil { - plog.Warningf("compaction encountered: %v", err) + s.lg.Warn("compaction encountered error", zap.Error(err)) } - if s.lg != nil { - s.lg.Info( - "resume scheduled compaction", - zap.String("meta-bucket-name", string(metaBucketName)), - zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)), - zap.Int64("scheduled-compact-revision", scheduledCompact), - ) - } else { - plog.Printf("resume scheduled compaction at %d", scheduledCompact) - } + s.lg.Info( + "resume scheduled compaction", + zap.String("meta-bucket-name", string(metaBucketName)), + zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)), + zap.Int64("scheduled-compact-revision", scheduledCompact), + ) } return nil @@ -501,7 +489,7 @@ func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int if ok { if isTombstone(rkv.key) { if err := ki.tombstone(lg, rev.main, rev.sub); err != nil { - plog.Warningf("tombstone encountered: %v", err) + lg.Warn("tombstone encountered error", zap.Error(err)) } continue } @@ -520,11 +508,7 @@ func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, k for i, key := range keys { rkv := revKeyValue{key: key} if err := rkv.kv.Unmarshal(vals[i]); err != nil { - if lg != nil { - lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) - } else { - plog.Fatalf("cannot unmarshal event: %v", err) - } + lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) } rkv.kstr = string(rkv.kv.Key) if isTombstone(key) { @@ -606,15 +590,11 @@ func (s *store) setupMetricsReporter() { // appendMarkTombstone appends tombstone mark to normal revision bytes. func appendMarkTombstone(lg *zap.Logger, b []byte) []byte { if len(b) != revBytesLen { - if lg != nil { - lg.Panic( - "cannot append tombstone mark to non-normal revision bytes", - zap.Int("expected-revision-bytes-size", revBytesLen), - zap.Int("given-revision-bytes-size", len(b)), - ) - } else { - plog.Panicf("cannot append mark to non normal revision bytes") - } + lg.Panic( + "cannot append tombstone mark to non-normal revision bytes", + zap.Int("expected-revision-bytes-size", revBytesLen), + zap.Int("given-revision-bytes-size", len(b)), + ) } return append(b, markTombstone) } diff --git a/mvcc/kvstore_compaction.go b/mvcc/kvstore_compaction.go index 4c6b062b4..0074c59e2 100644 --- a/mvcc/kvstore_compaction.go +++ b/mvcc/kvstore_compaction.go @@ -52,15 +52,11 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc revToBytes(revision{main: compactMainRev}, rbytes) tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes) tx.Unlock() - if s.lg != nil { - s.lg.Info( - "finished scheduled compaction", - zap.Int64("compact-revision", compactMainRev), - zap.Duration("took", time.Since(totalStart)), - ) - } else { - plog.Infof("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart)) - } + s.lg.Info( + "finished scheduled compaction", + zap.Int64("compact-revision", compactMainRev), + zap.Duration("took", time.Since(totalStart)), + ) return true } diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go index 716a6d82f..e89ddbee4 100644 --- a/mvcc/kvstore_txn.go +++ b/mvcc/kvstore_txn.go @@ -146,25 +146,17 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions revToBytes(revpair, revBytes) _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0) if len(vs) != 1 { - if tr.s.lg != nil { - tr.s.lg.Fatal( - "range failed to find revision pair", - zap.Int64("revision-main", revpair.main), - zap.Int64("revision-sub", revpair.sub), - ) - } else { - plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub) - } + tr.s.lg.Fatal( + "range failed to find revision pair", + zap.Int64("revision-main", revpair.main), + zap.Int64("revision-sub", revpair.sub), + ) } if err := kvs[i].Unmarshal(vs[0]); err != nil { - if tr.s.lg != nil { - tr.s.lg.Fatal( - "failed to unmarshal mvccpb.KeyValue", - zap.Error(err), - ) - } else { - plog.Fatalf("cannot unmarshal event: %v", err) - } + tr.s.lg.Fatal( + "failed to unmarshal mvccpb.KeyValue", + zap.Error(err), + ) } } tr.trace.Step("range keys from bolt db") @@ -200,14 +192,10 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { d, err := kv.Marshal() if err != nil { - if tw.storeTxnRead.s.lg != nil { - tw.storeTxnRead.s.lg.Fatal( - "failed to marshal mvccpb.KeyValue", - zap.Error(err), - ) - } else { - plog.Fatalf("cannot marshal event: %v", err) - } + tw.storeTxnRead.s.lg.Fatal( + "failed to marshal mvccpb.KeyValue", + zap.Error(err), + ) } tw.trace.Step("marshal mvccpb.KeyValue") @@ -222,14 +210,10 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { } err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) if err != nil { - if tw.storeTxnRead.s.lg != nil { - tw.storeTxnRead.s.lg.Fatal( - "failed to detach old lease from a key", - zap.Error(err), - ) - } else { - plog.Errorf("unexpected error from lease detach: %v", err) - } + tw.storeTxnRead.s.lg.Error( + "failed to detach old lease from a key", + zap.Error(err), + ) } } if leaseID != lease.NoLease { @@ -264,39 +248,26 @@ func (tw *storeTxnWrite) delete(key []byte) { idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))} revToBytes(idxRev, ibytes) - if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil { - ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes) - } else { - // TODO: remove this in v3.5 - ibytes = appendMarkTombstone(nil, ibytes) - } + ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes) kv := mvccpb.KeyValue{Key: key} d, err := kv.Marshal() if err != nil { - if tw.storeTxnRead.s.lg != nil { - tw.storeTxnRead.s.lg.Fatal( - "failed to marshal mvccpb.KeyValue", - zap.Error(err), - ) - } else { - plog.Fatalf("cannot marshal event: %v", err) - } + tw.storeTxnRead.s.lg.Fatal( + "failed to marshal mvccpb.KeyValue", + zap.Error(err), + ) } tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) err = tw.s.kvindex.Tombstone(key, idxRev) if err != nil { - if tw.storeTxnRead.s.lg != nil { - tw.storeTxnRead.s.lg.Fatal( - "failed to tombstone an existing key", - zap.String("key", string(key)), - zap.Error(err), - ) - } else { - plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err) - } + tw.storeTxnRead.s.lg.Fatal( + "failed to tombstone an existing key", + zap.String("key", string(key)), + zap.Error(err), + ) } tw.changes = append(tw.changes, kv) @@ -306,14 +277,10 @@ func (tw *storeTxnWrite) delete(key []byte) { if leaseID != lease.NoLease { err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item}) if err != nil { - if tw.storeTxnRead.s.lg != nil { - tw.storeTxnRead.s.lg.Fatal( - "failed to detach old lease from a key", - zap.Error(err), - ) - } else { - plog.Errorf("cannot detach %v", err) - } + tw.storeTxnRead.s.lg.Error( + "failed to detach old lease from a key", + zap.Error(err), + ) } } } diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index dd11d3f5e..6cedd5b18 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -74,6 +74,9 @@ func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexG } func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore { + if lg == nil { + lg = zap.NewNop() + } s := &watchableStore{ store: NewStore(lg, b, le, ig, cfg), victimc: make(chan struct{}, 1), @@ -351,12 +354,7 @@ func (s *watchableStore) syncWatchers() int { tx.RLock() revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) var evs []mvccpb.Event - if s.store != nil && s.store.lg != nil { - evs = kvsToEvents(s.store.lg, wg, revs, vs) - } else { - // TODO: remove this in v3.5 - evs = kvsToEvents(nil, wg, revs, vs) - } + evs = kvsToEvents(s.store.lg, wg, revs, vs) tx.RUnlock() var victims watcherBatch @@ -412,11 +410,7 @@ func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []m for i, v := range vals { var kv mvccpb.KeyValue if err := kv.Unmarshal(v); err != nil { - if lg != nil { - lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) - } else { - plog.Panicf("cannot unmarshal event: %v", err) - } + lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) } if !wg.contains(string(kv.Key)) { @@ -440,14 +434,10 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { var victim watcherBatch for w, eb := range newWatcherBatch(&s.synced, evs) { if eb.revs != 1 { - if s.store != nil && s.store.lg != nil { - s.store.lg.Panic( - "unexpected multiple revisions in watch notification", - zap.Int("number-of-revisions", eb.revs), - ) - } else { - plog.Panicf("unexpected multiple revisions in notification") - } + s.store.lg.Panic( + "unexpected multiple revisions in watch notification", + zap.Int("number-of-revisions", eb.revs), + ) } if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) { pendingEventsGauge.Add(float64(len(eb.evs)))