Merge pull request #11595 from jingyih/remove_capnslog_in_mvcc

mvcc: remove capnslog
This commit is contained in:
Sahdev Zala 2020-02-07 13:01:04 -05:00 committed by GitHub
commit 2f2354bca2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 149 additions and 315 deletions

View File

@ -25,7 +25,6 @@ import (
"sync/atomic"
"time"
"github.com/coreos/pkg/capnslog"
humanize "github.com/dustin/go-humanize"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
@ -42,8 +41,6 @@ var (
// This only works for linux.
initialMmapSize = uint64(10 * 1024 * 1024 * 1024)
plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc/backend")
// minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning.
minSnapshotWarningTimeout = 30 * time.Second
)
@ -144,6 +141,10 @@ func NewDefaultBackend(path string) Backend {
}
func newBackend(bcfg BackendConfig) *backend {
if bcfg.Logger == nil {
bcfg.Logger = zap.NewNop()
}
bopts := &bolt.Options{}
if boltOpenOptions != nil {
*bopts = *boltOpenOptions
@ -153,11 +154,7 @@ func newBackend(bcfg BackendConfig) *backend {
db, err := bolt.Open(bcfg.Path, 0600, bopts)
if err != nil {
if bcfg.Logger != nil {
bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
} else {
plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
}
bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
}
// In future, may want to make buffering optional for low-concurrency systems
@ -225,11 +222,7 @@ func (b *backend) Snapshot() Snapshot {
defer b.mu.RUnlock()
tx, err := b.db.Begin(false)
if err != nil {
if b.lg != nil {
b.lg.Fatal("failed to begin tx", zap.Error(err))
} else {
plog.Fatalf("cannot begin tx (%s)", err)
}
b.lg.Fatal("failed to begin tx", zap.Error(err))
}
stopc, donec := make(chan struct{}), make(chan struct{})
@ -249,16 +242,12 @@ func (b *backend) Snapshot() Snapshot {
for {
select {
case <-ticker.C:
if b.lg != nil {
b.lg.Warn(
"snapshotting taking too long to transfer",
zap.Duration("taking", time.Since(start)),
zap.Int64("bytes", dbBytes),
zap.String("size", humanize.Bytes(uint64(dbBytes))),
)
} else {
plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1024), start)
}
b.lg.Warn(
"snapshotting taking too long to transfer",
zap.Duration("taking", time.Since(start)),
zap.Int64("bytes", dbBytes),
zap.String("size", humanize.Bytes(uint64(dbBytes))),
)
case <-stopc:
snapshotTransferSec.Observe(time.Since(start).Seconds())
@ -392,47 +381,27 @@ func (b *backend) defrag() error {
if err != nil {
tmpdb.Close()
if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
if b.lg != nil {
b.lg.Error("failed to remove dirs under tmpdb", zap.Error(rmErr))
} else {
plog.Errorf("failed to remove dirs under tmpdb (%s)", rmErr)
}
b.lg.Error("failed to remove dirs under tmpdb", zap.Error(rmErr))
}
return err
}
err = b.db.Close()
if err != nil {
if b.lg != nil {
b.lg.Fatal("failed to close database", zap.Error(err))
} else {
plog.Fatalf("cannot close database (%s)", err)
}
b.lg.Fatal("failed to close database", zap.Error(err))
}
err = tmpdb.Close()
if err != nil {
if b.lg != nil {
b.lg.Fatal("failed to close tmp database", zap.Error(err))
} else {
plog.Fatalf("cannot close database (%s)", err)
}
b.lg.Fatal("failed to close tmp database", zap.Error(err))
}
err = os.Rename(tdbp, dbp)
if err != nil {
if b.lg != nil {
b.lg.Fatal("failed to rename tmp database", zap.Error(err))
} else {
plog.Fatalf("cannot rename database (%s)", err)
}
b.lg.Fatal("failed to rename tmp database", zap.Error(err))
}
b.db, err = bolt.Open(dbp, 0600, boltOpenOptions)
if err != nil {
if b.lg != nil {
b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
} else {
plog.Panicf("cannot open database at %s (%v)", dbp, err)
}
b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
}
b.batchTx.tx = b.unsafeBegin(true)
@ -541,11 +510,7 @@ func (b *backend) begin(write bool) *bolt.Tx {
func (b *backend) unsafeBegin(write bool) *bolt.Tx {
tx, err := b.db.Begin(write)
if err != nil {
if b.lg != nil {
b.lg.Fatal("failed to begin tx", zap.Error(err))
} else {
plog.Fatalf("cannot begin tx (%s)", err)
}
b.lg.Fatal("failed to begin tx", zap.Error(err))
}
return tx
}

View File

@ -71,15 +71,11 @@ func (t *batchTx) RUnlock() {
func (t *batchTx) UnsafeCreateBucket(name []byte) {
_, err := t.tx.CreateBucket(name)
if err != nil && err != bolt.ErrBucketExists {
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to create a bucket",
zap.String("bucket-name", string(name)),
zap.Error(err),
)
} else {
plog.Fatalf("cannot create bucket %s (%v)", name, err)
}
t.backend.lg.Fatal(
"failed to create a bucket",
zap.String("bucket-name", string(name)),
zap.Error(err),
)
}
t.pending++
}
@ -97,14 +93,10 @@ func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
)
} else {
plog.Fatalf("bucket %s does not exist", bucketName)
}
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
)
}
if seq {
// it is useful to increase fill percent when the workloads are mostly append-only.
@ -112,15 +104,11 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
bucket.FillPercent = 0.9
}
if err := bucket.Put(key, value); err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to write to a bucket",
zap.String("bucket-name", string(bucketName)),
zap.Error(err),
)
} else {
plog.Fatalf("cannot put key into bucket (%v)", err)
}
t.backend.lg.Fatal(
"failed to write to a bucket",
zap.String("bucket-name", string(bucketName)),
zap.Error(err),
)
}
t.pending++
}
@ -129,14 +117,10 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
)
} else {
plog.Fatalf("bucket %s does not exist", bucketName)
}
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
)
}
return unsafeRange(bucket.Cursor(), key, endKey, limit)
}
@ -167,26 +151,18 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
)
} else {
plog.Fatalf("bucket %s does not exist", bucketName)
}
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
)
}
err := bucket.Delete(key)
if err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to delete a key",
zap.String("bucket-name", string(bucketName)),
zap.Error(err),
)
} else {
plog.Fatalf("cannot delete key from bucket (%v)", err)
}
t.backend.lg.Fatal(
"failed to delete a key",
zap.String("bucket-name", string(bucketName)),
zap.Error(err),
)
}
t.pending++
}
@ -244,11 +220,7 @@ func (t *batchTx) commit(stop bool) {
t.pending = 0
if err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
} else {
plog.Fatalf("cannot commit tx (%s)", err)
}
t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
}
}
if !stop {
@ -311,11 +283,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
go func(tx *bolt.Tx, wg *sync.WaitGroup) {
wg.Wait()
if err := tx.Rollback(); err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
}
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
}
}(t.backend.readTx.tx, t.backend.readTx.txWg)
t.backend.readTx.reset()

View File

@ -185,11 +185,7 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
if ti.lg != nil {
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
} else {
plog.Printf("store.index: compact %d", rev)
}
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
ti.Lock()
clone := ti.tree.Clone()
ti.Unlock()
@ -203,11 +199,7 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
if keyi.isEmpty() {
item := ti.tree.Delete(keyi)
if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
}
ti.lg.Panic("failed to delete during compaction")
}
}
ti.Unlock()

View File

@ -78,17 +78,13 @@ func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
rev := revision{main: main, sub: sub}
if !rev.GreaterThan(ki.modified) {
if lg != nil {
lg.Panic(
"'put' with an unexpected smaller revision",
zap.Int64("given-revision-main", rev.main),
zap.Int64("given-revision-sub", rev.sub),
zap.Int64("modified-revision-main", ki.modified.main),
zap.Int64("modified-revision-sub", ki.modified.sub),
)
} else {
plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
}
lg.Panic(
"'put' with an unexpected smaller revision",
zap.Int64("given-revision-main", rev.main),
zap.Int64("given-revision-sub", rev.sub),
zap.Int64("modified-revision-main", ki.modified.main),
zap.Int64("modified-revision-sub", ki.modified.sub),
)
}
if len(ki.generations) == 0 {
ki.generations = append(ki.generations, generation{})
@ -105,14 +101,10 @@ func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {
if len(ki.generations) != 0 {
if lg != nil {
lg.Panic(
"'restore' got an unexpected non-empty generations",
zap.Int("generations-size", len(ki.generations)),
)
} else {
plog.Panicf("store.keyindex: cannot restore non-empty keyIndex")
}
lg.Panic(
"'restore' got an unexpected non-empty generations",
zap.Int("generations-size", len(ki.generations)),
)
}
ki.modified = modified
@ -126,14 +118,10 @@ func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int6
// It returns ErrRevisionNotFound when tombstone on an empty generation.
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
if ki.isEmpty() {
if lg != nil {
lg.Panic(
"'tombstone' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
} else {
plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
}
lg.Panic(
"'tombstone' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
}
if ki.generations[len(ki.generations)-1].isEmpty() {
return ErrRevisionNotFound
@ -148,14 +136,10 @@ func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
// Rev must be higher than or equal to the given atRev.
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
if ki.isEmpty() {
if lg != nil {
lg.Panic(
"'get' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
} else {
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
}
lg.Panic(
"'get' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
}
g := ki.findGeneration(atRev)
if g.isEmpty() {
@ -175,14 +159,10 @@ func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision
// main revision.
func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
if ki.isEmpty() {
if lg != nil {
lg.Panic(
"'since' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
} else {
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
}
lg.Panic(
"'since' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
}
since := revision{rev, 0}
var gi int
@ -223,14 +203,10 @@ func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
// If a generation becomes empty during compaction, it will be removed.
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
if ki.isEmpty() {
if lg != nil {
lg.Panic(
"'compact' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
} else {
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
}
lg.Panic(
"'compact' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
}
genIdx, revIndex := ki.doCompact(atRev, available)

View File

@ -31,7 +31,6 @@ import (
"go.etcd.io/etcd/pkg/schedule"
"go.etcd.io/etcd/pkg/traceutil"
"github.com/coreos/pkg/capnslog"
"go.uber.org/zap"
)
@ -46,8 +45,6 @@ var (
ErrCompacted = errors.New("mvcc: required revision has been compacted")
ErrFutureRev = errors.New("mvcc: required revision is a future revision")
ErrCanceled = errors.New("mvcc: watcher is canceled")
plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc")
)
const (
@ -116,6 +113,9 @@ type store struct {
// NewStore returns a new store. It is useful to create a store inside
// mvcc pkg. It should only be used for testing externally.
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store {
if lg == nil {
lg = zap.NewNop()
}
if cfg.CompactionBatchLimit == 0 {
cfg.CompactionBatchLimit = defaultCompactBatchLimit
}
@ -372,16 +372,12 @@ func (s *store) restore() error {
if len(finishedCompactBytes) != 0 {
s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
if s.lg != nil {
s.lg.Info(
"restored last compact revision",
zap.String("meta-bucket-name", string(metaBucketName)),
zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
zap.Int64("restored-compact-revision", s.compactMainRev),
)
} else {
plog.Printf("restore compact to %d", s.compactMainRev)
}
s.lg.Info(
"restored last compact revision",
zap.String("meta-bucket-name", string(metaBucketName)),
zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
zap.Int64("restored-compact-revision", s.compactMainRev),
)
}
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
@ -429,15 +425,11 @@ func (s *store) restore() error {
}
err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
if err != nil {
if s.lg != nil {
s.lg.Warn(
"failed to attach a lease",
zap.String("lease-id", fmt.Sprintf("%016x", lid)),
zap.Error(err),
)
} else {
plog.Errorf("unexpected Attach error: %v", err)
}
s.lg.Error(
"failed to attach a lease",
zap.String("lease-id", fmt.Sprintf("%016x", lid)),
zap.Error(err),
)
}
}
@ -445,19 +437,15 @@ func (s *store) restore() error {
if scheduledCompact != 0 {
if _, err := s.compactLockfree(scheduledCompact); err != nil {
plog.Warningf("compaction encountered: %v", err)
s.lg.Warn("compaction encountered error", zap.Error(err))
}
if s.lg != nil {
s.lg.Info(
"resume scheduled compaction",
zap.String("meta-bucket-name", string(metaBucketName)),
zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
zap.Int64("scheduled-compact-revision", scheduledCompact),
)
} else {
plog.Printf("resume scheduled compaction at %d", scheduledCompact)
}
s.lg.Info(
"resume scheduled compaction",
zap.String("meta-bucket-name", string(metaBucketName)),
zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
zap.Int64("scheduled-compact-revision", scheduledCompact),
)
}
return nil
@ -501,7 +489,7 @@ func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int
if ok {
if isTombstone(rkv.key) {
if err := ki.tombstone(lg, rev.main, rev.sub); err != nil {
plog.Warningf("tombstone encountered: %v", err)
lg.Warn("tombstone encountered error", zap.Error(err))
}
continue
}
@ -520,11 +508,7 @@ func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, k
for i, key := range keys {
rkv := revKeyValue{key: key}
if err := rkv.kv.Unmarshal(vals[i]); err != nil {
if lg != nil {
lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
} else {
plog.Fatalf("cannot unmarshal event: %v", err)
}
lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
}
rkv.kstr = string(rkv.kv.Key)
if isTombstone(key) {
@ -606,15 +590,11 @@ func (s *store) setupMetricsReporter() {
// appendMarkTombstone appends tombstone mark to normal revision bytes.
func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
if len(b) != revBytesLen {
if lg != nil {
lg.Panic(
"cannot append tombstone mark to non-normal revision bytes",
zap.Int("expected-revision-bytes-size", revBytesLen),
zap.Int("given-revision-bytes-size", len(b)),
)
} else {
plog.Panicf("cannot append mark to non normal revision bytes")
}
lg.Panic(
"cannot append tombstone mark to non-normal revision bytes",
zap.Int("expected-revision-bytes-size", revBytesLen),
zap.Int("given-revision-bytes-size", len(b)),
)
}
return append(b, markTombstone)
}

View File

@ -52,15 +52,11 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
revToBytes(revision{main: compactMainRev}, rbytes)
tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
tx.Unlock()
if s.lg != nil {
s.lg.Info(
"finished scheduled compaction",
zap.Int64("compact-revision", compactMainRev),
zap.Duration("took", time.Since(totalStart)),
)
} else {
plog.Infof("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
}
s.lg.Info(
"finished scheduled compaction",
zap.Int64("compact-revision", compactMainRev),
zap.Duration("took", time.Since(totalStart)),
)
return true
}

View File

@ -146,25 +146,17 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
revToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
if len(vs) != 1 {
if tr.s.lg != nil {
tr.s.lg.Fatal(
"range failed to find revision pair",
zap.Int64("revision-main", revpair.main),
zap.Int64("revision-sub", revpair.sub),
)
} else {
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
}
tr.s.lg.Fatal(
"range failed to find revision pair",
zap.Int64("revision-main", revpair.main),
zap.Int64("revision-sub", revpair.sub),
)
}
if err := kvs[i].Unmarshal(vs[0]); err != nil {
if tr.s.lg != nil {
tr.s.lg.Fatal(
"failed to unmarshal mvccpb.KeyValue",
zap.Error(err),
)
} else {
plog.Fatalf("cannot unmarshal event: %v", err)
}
tr.s.lg.Fatal(
"failed to unmarshal mvccpb.KeyValue",
zap.Error(err),
)
}
}
tr.trace.Step("range keys from bolt db")
@ -200,14 +192,10 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
d, err := kv.Marshal()
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
} else {
plog.Fatalf("cannot marshal event: %v", err)
}
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
}
tw.trace.Step("marshal mvccpb.KeyValue")
@ -222,14 +210,10 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
}
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to detach old lease from a key",
zap.Error(err),
)
} else {
plog.Errorf("unexpected error from lease detach: %v", err)
}
tw.storeTxnRead.s.lg.Error(
"failed to detach old lease from a key",
zap.Error(err),
)
}
}
if leaseID != lease.NoLease {
@ -264,39 +248,26 @@ func (tw *storeTxnWrite) delete(key []byte) {
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil {
ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
} else {
// TODO: remove this in v3.5
ibytes = appendMarkTombstone(nil, ibytes)
}
ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
kv := mvccpb.KeyValue{Key: key}
d, err := kv.Marshal()
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
} else {
plog.Fatalf("cannot marshal event: %v", err)
}
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
err = tw.s.kvindex.Tombstone(key, idxRev)
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to tombstone an existing key",
zap.String("key", string(key)),
zap.Error(err),
)
} else {
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
}
tw.storeTxnRead.s.lg.Fatal(
"failed to tombstone an existing key",
zap.String("key", string(key)),
zap.Error(err),
)
}
tw.changes = append(tw.changes, kv)
@ -306,14 +277,10 @@ func (tw *storeTxnWrite) delete(key []byte) {
if leaseID != lease.NoLease {
err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to detach old lease from a key",
zap.Error(err),
)
} else {
plog.Errorf("cannot detach %v", err)
}
tw.storeTxnRead.s.lg.Error(
"failed to detach old lease from a key",
zap.Error(err),
)
}
}
}

View File

@ -74,6 +74,9 @@ func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexG
}
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
if lg == nil {
lg = zap.NewNop()
}
s := &watchableStore{
store: NewStore(lg, b, le, ig, cfg),
victimc: make(chan struct{}, 1),
@ -351,12 +354,7 @@ func (s *watchableStore) syncWatchers() int {
tx.RLock()
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
var evs []mvccpb.Event
if s.store != nil && s.store.lg != nil {
evs = kvsToEvents(s.store.lg, wg, revs, vs)
} else {
// TODO: remove this in v3.5
evs = kvsToEvents(nil, wg, revs, vs)
}
evs = kvsToEvents(s.store.lg, wg, revs, vs)
tx.RUnlock()
var victims watcherBatch
@ -412,11 +410,7 @@ func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []m
for i, v := range vals {
var kv mvccpb.KeyValue
if err := kv.Unmarshal(v); err != nil {
if lg != nil {
lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
} else {
plog.Panicf("cannot unmarshal event: %v", err)
}
lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
}
if !wg.contains(string(kv.Key)) {
@ -440,14 +434,10 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
var victim watcherBatch
for w, eb := range newWatcherBatch(&s.synced, evs) {
if eb.revs != 1 {
if s.store != nil && s.store.lg != nil {
s.store.lg.Panic(
"unexpected multiple revisions in watch notification",
zap.Int("number-of-revisions", eb.revs),
)
} else {
plog.Panicf("unexpected multiple revisions in notification")
}
s.store.lg.Panic(
"unexpected multiple revisions in watch notification",
zap.Int("number-of-revisions", eb.revs),
)
}
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))