diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index eb39bc367..2c16b5d4a 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -27,6 +27,8 @@ import ( bolt "github.com/coreos/bbolt" "github.com/coreos/pkg/capnslog" + humanize "github.com/dustin/go-humanize" + "go.uber.org/zap" ) var ( @@ -97,6 +99,8 @@ type backend struct { stopc chan struct{} donec chan struct{} + + lg *zap.Logger } type BackendConfig struct { @@ -108,6 +112,8 @@ type BackendConfig struct { BatchLimit int // MmapSize is the number of bytes to mmap for the backend. MmapSize uint64 + // Logger logs backend-side operations. + Logger *zap.Logger } func DefaultBackendConfig() BackendConfig { @@ -137,7 +143,11 @@ func newBackend(bcfg BackendConfig) *backend { db, err := bolt.Open(bcfg.Path, 0600, bopts) if err != nil { - plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err) + if bcfg.Logger != nil { + bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err)) + } else { + plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err) + } } // In future, may want to make buffering optional for low-concurrency systems @@ -157,6 +167,8 @@ func newBackend(bcfg BackendConfig) *backend { stopc: make(chan struct{}), donec: make(chan struct{}), + + lg: bcfg.Logger, } b.batchTx = newBatchTxBuffered(b) go b.run() @@ -204,7 +216,16 @@ func (b *backend) Snapshot() Snapshot { for { select { case <-ticker.C: - plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1014), start) + if b.lg != nil { + b.lg.Warn( + "snapshotting taking too long to transfer", + zap.Duration("taking", time.Since(start)), + zap.Int64("bytes", dbBytes), + zap.String("size", humanize.Bytes(uint64(dbBytes))), + ) + } else { + plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1014), start) + } case <-stopc: snapshotDurations.Observe(time.Since(start).Seconds()) return @@ -294,6 +315,8 @@ func (b *backend) Defrag() error { } func (b *backend) defrag() error { + now := time.Now() + // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. @@ -317,37 +340,67 @@ func (b *backend) defrag() error { return err } - err = defragdb(b.db, tmpdb, defragLimit) + dbp := b.db.Path() + tdbp := tmpdb.Path() + size1, sizeInUse1 := b.Size(), b.SizeInUse() + if b.lg != nil { + b.lg.Info( + "defragmenting", + zap.String("path", dbp), + zap.Int64("current-db-size-bytes", size1), + zap.String("current-db-size", humanize.Bytes(uint64(size1))), + zap.Int64("current-db-size-in-use-bytes", sizeInUse1), + zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))), + ) + } + err = defragdb(b.db, tmpdb, defragLimit) if err != nil { tmpdb.Close() os.RemoveAll(tmpdb.Path()) return err } - dbp := b.db.Path() - tdbp := tmpdb.Path() - err = b.db.Close() if err != nil { - plog.Fatalf("cannot close database (%s)", err) + if b.lg != nil { + b.lg.Fatal("failed to close database", zap.Error(err)) + } else { + plog.Fatalf("cannot close database (%s)", err) + } } err = tmpdb.Close() if err != nil { - plog.Fatalf("cannot close database (%s)", err) + if b.lg != nil { + b.lg.Fatal("failed to close tmp database", zap.Error(err)) + } else { + plog.Fatalf("cannot close database (%s)", err) + } } err = os.Rename(tdbp, dbp) if err != nil { - plog.Fatalf("cannot rename database (%s)", err) + if b.lg != nil { + b.lg.Fatal("failed to rename tmp database", zap.Error(err)) + } else { + plog.Fatalf("cannot rename database (%s)", err) + } } b.db, err = bolt.Open(dbp, 0600, boltOpenOptions) if err != nil { - plog.Panicf("cannot open database at %s (%v)", dbp, err) + if b.lg != nil { + b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err)) + } else { + plog.Panicf("cannot open database at %s (%v)", dbp, err) + } } b.batchTx.tx, err = b.db.Begin(true) if err != nil { - plog.Fatalf("cannot begin tx (%s)", err) + if b.lg != nil { + b.lg.Fatal("failed to begin tx", zap.Error(err)) + } else { + plog.Fatalf("cannot begin tx (%s)", err) + } } b.readTx.reset() @@ -358,6 +411,20 @@ func (b *backend) defrag() error { atomic.StoreInt64(&b.size, size) atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize))) + size2, sizeInUse2 := b.Size(), b.SizeInUse() + if b.lg != nil { + b.lg.Info( + "defragmented", + zap.String("path", dbp), + zap.Int64("current-db-size-bytes-diff", size2-size1), + zap.Int64("current-db-size-bytes", size2), + zap.String("current-db-size", humanize.Bytes(uint64(size2))), + zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1), + zap.Int64("current-db-size-in-use-bytes", sizeInUse2), + zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))), + zap.Duration("took", time.Since(now)), + ) + } return nil } @@ -429,7 +496,11 @@ func (b *backend) begin(write bool) *bolt.Tx { func (b *backend) unsafeBegin(write bool) *bolt.Tx { tx, err := b.db.Begin(write) if err != nil { - plog.Fatalf("cannot begin tx (%s)", err) + if b.lg != nil { + b.lg.Fatal("failed to begin tx", zap.Error(err)) + } else { + plog.Fatalf("cannot begin tx (%s)", err) + } } return tx } @@ -438,7 +509,7 @@ func (b *backend) unsafeBegin(write bool) *bolt.Tx { func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) { dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test") if err != nil { - plog.Fatal(err) + panic(err) } tmpPath := filepath.Join(dir, "database") bcfg := DefaultBackendConfig() diff --git a/mvcc/index.go b/mvcc/index.go index b27a9e543..626de3825 100644 --- a/mvcc/index.go +++ b/mvcc/index.go @@ -19,6 +19,7 @@ import ( "sync" "github.com/google/btree" + "go.uber.org/zap" ) type index interface { @@ -39,11 +40,13 @@ type index interface { type treeIndex struct { sync.RWMutex tree *btree.BTree + lg *zap.Logger } -func newTreeIndex() index { +func newTreeIndex(lg *zap.Logger) index { return &treeIndex{ tree: btree.New(32), + lg: lg, } } @@ -183,7 +186,11 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision { func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { available := make(map[revision]struct{}) var emptyki []*keyIndex - plog.Printf("store.index: compact %d", rev) + if ti.lg != nil { + ti.lg.Info("compact tree index", zap.Int64("revision", rev)) + } else { + plog.Printf("store.index: compact %d", rev) + } // TODO: do not hold the lock for long time? // This is probably OK. Compacting 10M keys takes O(10ms). ti.Lock() @@ -192,7 +199,11 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { for _, ki := range emptyki { item := ti.tree.Delete(ki) if item == nil { - plog.Panic("store.index: unexpected delete failure during compaction") + if ti.lg != nil { + ti.lg.Panic("failed to delete during compaction") + } else { + plog.Panic("store.index: unexpected delete failure during compaction") + } } } return available diff --git a/mvcc/index_test.go b/mvcc/index_test.go index d05315601..0016874e4 100644 --- a/mvcc/index_test.go +++ b/mvcc/index_test.go @@ -19,10 +19,11 @@ import ( "testing" "github.com/google/btree" + "go.uber.org/zap" ) func TestIndexGet(t *testing.T) { - ti := newTreeIndex() + ti := newTreeIndex(zap.NewExample()) ti.Put([]byte("foo"), revision{main: 2}) ti.Put([]byte("foo"), revision{main: 4}) ti.Tombstone([]byte("foo"), revision{main: 6}) @@ -64,7 +65,7 @@ func TestIndexRange(t *testing.T) { allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2")} allRevs := []revision{{main: 1}, {main: 2}, {main: 3}} - ti := newTreeIndex() + ti := newTreeIndex(zap.NewExample()) for i := range allKeys { ti.Put(allKeys[i], allRevs[i]) } @@ -120,7 +121,7 @@ func TestIndexRange(t *testing.T) { } func TestIndexTombstone(t *testing.T) { - ti := newTreeIndex() + ti := newTreeIndex(zap.NewExample()) ti.Put([]byte("foo"), revision{main: 1}) err := ti.Tombstone([]byte("foo"), revision{main: 2}) @@ -142,7 +143,7 @@ func TestIndexRangeSince(t *testing.T) { allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2"), []byte("foo2"), []byte("foo1"), []byte("foo")} allRevs := []revision{{main: 1}, {main: 2}, {main: 3}, {main: 4}, {main: 5}, {main: 6}} - ti := newTreeIndex() + ti := newTreeIndex(zap.NewExample()) for i := range allKeys { ti.Put(allKeys[i], allRevs[i]) } @@ -216,7 +217,7 @@ func TestIndexCompactAndKeep(t *testing.T) { } // Continuous Compact and Keep - ti := newTreeIndex() + ti := newTreeIndex(zap.NewExample()) for _, tt := range tests { if tt.remove { ti.Tombstone(tt.key, tt.rev) @@ -247,7 +248,7 @@ func TestIndexCompactAndKeep(t *testing.T) { // Once Compact and Keep for i := int64(1); i < maxRev; i++ { - ti := newTreeIndex() + ti := newTreeIndex(zap.NewExample()) for _, tt := range tests { if tt.remove { ti.Tombstone(tt.key, tt.rev) diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index 2d7dc01ff..ae233b198 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "go.uber.org/zap" ) // Functional tests for features implemented in v3 store. It treats v3 store @@ -75,7 +76,7 @@ func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) } func testKVRange(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) kvs := put3TestKVs(s) @@ -141,7 +142,7 @@ func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) } func testKVRangeRev(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) kvs := put3TestKVs(s) @@ -177,7 +178,7 @@ func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) } func testKVRangeBadRev(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) put3TestKVs(s) @@ -210,7 +211,7 @@ func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) } func testKVRangeLimit(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) kvs := put3TestKVs(s) @@ -251,7 +252,7 @@ func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutF func testKVPutMultipleTimes(t *testing.T, f putFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { @@ -313,7 +314,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) { for i, tt := range tests { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) s.Put([]byte("foo"), []byte("bar"), lease.NoLease) s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease) @@ -333,7 +334,7 @@ func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, t func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -354,7 +355,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) { // test that range, put, delete on single key in sequence repeatedly works correctly. func TestKVOperationInSequence(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { @@ -401,7 +402,7 @@ func TestKVOperationInSequence(t *testing.T) { func TestKVTxnBlockWriteOperations(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) tests := []func(){ func() { s.Put([]byte("foo"), nil, lease.NoLease) }, @@ -434,7 +435,7 @@ func TestKVTxnBlockWriteOperations(t *testing.T) { func TestKVTxnNonBlockRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) txn := s.Write() @@ -455,7 +456,7 @@ func TestKVTxnNonBlockRange(t *testing.T) { // test that txn range, put, delete on single key in sequence repeatedly works correctly. func TestKVTxnOperationInSequence(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { @@ -505,7 +506,7 @@ func TestKVTxnOperationInSequence(t *testing.T) { func TestKVCompactReserveLastValue(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar0"), 1) @@ -559,7 +560,7 @@ func TestKVCompactReserveLastValue(t *testing.T) { func TestKVCompactBad(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar0"), lease.NoLease) @@ -592,7 +593,7 @@ func TestKVHash(t *testing.T) { for i := 0; i < len(hashes); i++ { var err error b, tmpPath := backend.NewDefaultTmpBackend() - kv := NewStore(b, &lease.FakeLessor{}, nil) + kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease) kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease) hashes[i], _, err = kv.Hash() @@ -630,7 +631,7 @@ func TestKVRestore(t *testing.T) { } for i, tt := range tests { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) tt(s) var kvss [][]mvccpb.KeyValue for k := int64(0); k < 10; k++ { @@ -642,7 +643,7 @@ func TestKVRestore(t *testing.T) { s.Close() // ns should recover the the previous state from backend. - ns := NewStore(b, &lease.FakeLessor{}, nil) + ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) if keysRestore := readGaugeInt(&keysGauge); keysBefore != keysRestore { t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore) @@ -674,7 +675,7 @@ func readGaugeInt(g *prometheus.Gauge) int { func TestKVSnapshot(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) wkvs := put3TestKVs(s) @@ -694,7 +695,7 @@ func TestKVSnapshot(t *testing.T) { } f.Close() - ns := NewStore(b, &lease.FakeLessor{}, nil) + ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer ns.Close() r, err := ns.Range([]byte("a"), []byte("z"), RangeOptions{}) if err != nil { @@ -710,7 +711,7 @@ func TestKVSnapshot(t *testing.T) { func TestWatchableKVWatch(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 73f3c4c83..f03b6311e 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -30,6 +30,7 @@ import ( "github.com/coreos/etcd/pkg/schedule" "github.com/coreos/pkg/capnslog" + "go.uber.org/zap" ) var ( @@ -100,15 +101,17 @@ type store struct { fifoSched schedule.Scheduler stopc chan struct{} + + lg *zap.Logger } // NewStore returns a new store. It is useful to create a store inside // mvcc pkg. It should only be used for testing externally. -func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store { +func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store { s := &store{ b: b, ig: ig, - kvindex: newTreeIndex(), + kvindex: newTreeIndex(lg), le: le, @@ -119,6 +122,8 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto fifoSched: schedule.NewFIFOScheduler(), stopc: make(chan struct{}), + + lg: lg, } s.ReadView = &readView{s} s.WriteView = &writeView{s} @@ -291,7 +296,7 @@ func (s *store) Restore(b backend.Backend) error { atomic.StoreUint64(&s.consistentIndex, 0) s.b = b - s.kvindex = newTreeIndex() + s.kvindex = newTreeIndex(s.lg) s.currentRev = 1 s.compactMainRev = -1 s.fifoSched = schedule.NewFIFOScheduler() diff --git a/mvcc/kvstore_bench_test.go b/mvcc/kvstore_bench_test.go index a64a3c5a5..6d38cd74e 100644 --- a/mvcc/kvstore_bench_test.go +++ b/mvcc/kvstore_bench_test.go @@ -20,6 +20,8 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" + + "go.uber.org/zap" ) type fakeConsistentIndex uint64 @@ -31,7 +33,7 @@ func (i *fakeConsistentIndex) ConsistentIndex() uint64 { func BenchmarkStorePut(b *testing.B) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(be, &lease.FakeLessor{}, &i) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -51,7 +53,7 @@ func BenchmarkStoreRangeKey100(b *testing.B) { benchmarkStoreRange(b, 100) } func benchmarkStoreRange(b *testing.B, n int) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(be, &lease.FakeLessor{}, &i) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i) defer cleanup(s, be, tmpPath) // 64 byte key/val @@ -79,7 +81,7 @@ func benchmarkStoreRange(b *testing.B, n int) { func BenchmarkConsistentIndex(b *testing.B) { fci := fakeConsistentIndex(10) be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(be, &lease.FakeLessor{}, &fci) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &fci) defer cleanup(s, be, tmpPath) tx := s.b.BatchTx() @@ -98,7 +100,7 @@ func BenchmarkConsistentIndex(b *testing.B) { func BenchmarkStorePutUpdate(b *testing.B) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(be, &lease.FakeLessor{}, &i) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -117,7 +119,7 @@ func BenchmarkStorePutUpdate(b *testing.B) { func BenchmarkStoreTxnPut(b *testing.B) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(be, &lease.FakeLessor{}, &i) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -138,7 +140,7 @@ func BenchmarkStoreTxnPut(b *testing.B) { func benchmarkStoreRestore(revsPerKey int, b *testing.B) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(be, &lease.FakeLessor{}, &i) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i) // use closure to capture 's' to pick up the reassignment defer func() { cleanup(s, be, tmpPath) }() @@ -158,7 +160,7 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) { b.ReportAllocs() b.ResetTimer() - s = NewStore(be, &lease.FakeLessor{}, &i) + s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i) } func BenchmarkStoreRestoreRevs1(b *testing.B) { diff --git a/mvcc/kvstore_compaction.go b/mvcc/kvstore_compaction.go index 1726490c1..e7cfec1a5 100644 --- a/mvcc/kvstore_compaction.go +++ b/mvcc/kvstore_compaction.go @@ -17,6 +17,8 @@ package mvcc import ( "encoding/binary" "time" + + "go.uber.org/zap" ) func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) bool { @@ -51,7 +53,15 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc revToBytes(revision{main: compactMainRev}, rbytes) tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes) tx.Unlock() - plog.Printf("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart)) + if s.lg != nil { + s.lg.Info( + "finished scheduled compaction", + zap.Int64("compact-revision", compactMainRev), + zap.Duration("took", time.Since(totalStart)), + ) + } else { + plog.Printf("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart)) + } return true } diff --git a/mvcc/kvstore_compaction_test.go b/mvcc/kvstore_compaction_test.go index b2ee570f9..546578c5a 100644 --- a/mvcc/kvstore_compaction_test.go +++ b/mvcc/kvstore_compaction_test.go @@ -22,6 +22,7 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" + "go.uber.org/zap" ) func TestScheduleCompaction(t *testing.T) { @@ -64,7 +65,7 @@ func TestScheduleCompaction(t *testing.T) { } for i, tt := range tests { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) tx := s.b.BatchTx() tx.Lock() @@ -98,7 +99,7 @@ func TestScheduleCompaction(t *testing.T) { func TestCompactAllAndRestore(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s0 := NewStore(b, &lease.FakeLessor{}, nil) + s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -124,7 +125,7 @@ func TestCompactAllAndRestore(t *testing.T) { t.Fatal(err) } - s1 := NewStore(b, &lease.FakeLessor{}, nil) + s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) if s1.Rev() != rev { t.Errorf("rev = %v, want %v", s1.Rev(), rev) } diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index 923a4d5b6..157247606 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -31,11 +31,12 @@ import ( "github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/pkg/schedule" "github.com/coreos/etcd/pkg/testutil" + "go.uber.org/zap" ) func TestStoreRev(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer s.Close() defer os.Remove(tmpPath) @@ -419,7 +420,7 @@ func TestRestoreDelete(t *testing.T) { defer func() { restoreChunkKeys = oldChunk }() b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) keys := make(map[string]struct{}) @@ -445,7 +446,7 @@ func TestRestoreDelete(t *testing.T) { } s.Close() - s = NewStore(b, &lease.FakeLessor{}, nil) + s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer s.Close() for i := 0; i < 20; i++ { ks := fmt.Sprintf("foo-%d", i) @@ -465,7 +466,7 @@ func TestRestoreDelete(t *testing.T) { func TestRestoreContinueUnfinishedCompaction(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s0 := NewStore(b, &lease.FakeLessor{}, nil) + s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -482,7 +483,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { s0.Close() - s1 := NewStore(b, &lease.FakeLessor{}, nil) + s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) // wait for scheduled compaction to be finished time.Sleep(100 * time.Millisecond) @@ -519,7 +520,7 @@ type hashKVResult struct { // TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting. func TestHashKVWhenCompacting(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) rev := 10000 @@ -587,7 +588,7 @@ func TestHashKVWhenCompacting(t *testing.T) { // correct hash value with latest revision. func TestHashKVZeroRevision(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) rev := 1000 @@ -620,7 +621,7 @@ func TestTxnPut(t *testing.T) { vals := createBytesSlice(bytesN, sliceN) b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) for i := 0; i < sliceN; i++ { @@ -635,7 +636,7 @@ func TestTxnPut(t *testing.T) { func TestTxnBlockBackendForceCommit(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) txn := s.Read() diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 14bb14ce2..1c70d6ade 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" + "go.uber.org/zap" ) // non-const so modifiable by tests @@ -67,13 +68,13 @@ type watchableStore struct { // cancel operations. type cancelFunc func() -func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV { - return newWatchableStore(b, le, ig) +func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV { + return newWatchableStore(lg, b, le, ig) } -func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore { +func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore { s := &watchableStore{ - store: NewStore(b, le, ig), + store: NewStore(lg, b, le, ig), victimc: make(chan struct{}, 1), unsynced: newWatcherGroup(), synced: newWatcherGroup(), diff --git a/mvcc/watchable_store_bench_test.go b/mvcc/watchable_store_bench_test.go index 198fea6bb..f81cbb238 100644 --- a/mvcc/watchable_store_bench_test.go +++ b/mvcc/watchable_store_bench_test.go @@ -21,11 +21,13 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" + + "go.uber.org/zap" ) func BenchmarkWatchableStorePut(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := New(be, &lease.FakeLessor{}, nil) + s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -46,7 +48,7 @@ func BenchmarkWatchableStorePut(b *testing.B) { func BenchmarkWatchableStoreTxnPut(b *testing.B) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := New(be, &lease.FakeLessor{}, &i) + s := New(zap.NewExample(), be, &lease.FakeLessor{}, &i) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -67,7 +69,7 @@ func BenchmarkWatchableStoreTxnPut(b *testing.B) { // many synced watchers receiving a Put notification. func BenchmarkWatchableStoreWatchSyncPut(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(be, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil) defer cleanup(s, be, tmpPath) k := []byte("testkey") @@ -105,7 +107,7 @@ func BenchmarkWatchableStoreWatchSyncPut(b *testing.B) { // we should put to simulate the real-world use cases. func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(be, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil) // manually create watchableStore instead of newWatchableStore // because newWatchableStore periodically calls syncWatchersLoop @@ -162,7 +164,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(be, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil) defer func() { s.store.Close() diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index 28762fad4..ec784e85a 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -26,11 +26,12 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" + "go.uber.org/zap" ) func TestWatch(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer func() { s.store.Close() @@ -52,7 +53,7 @@ func TestWatch(t *testing.T) { func TestNewWatcherCancel(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer func() { s.store.Close() @@ -84,7 +85,7 @@ func TestCancelUnsynced(t *testing.T) { // method to sync watchers in unsynced map. We want to keep watchers // in unsynced to test if syncWatchers works as expected. s := &watchableStore{ - store: NewStore(b, &lease.FakeLessor{}, nil), + store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil), unsynced: newWatcherGroup(), // to make the test not crash from assigning to nil map. @@ -139,7 +140,7 @@ func TestSyncWatchers(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := &watchableStore{ - store: NewStore(b, &lease.FakeLessor{}, nil), + store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil), unsynced: newWatcherGroup(), synced: newWatcherGroup(), } @@ -222,7 +223,7 @@ func TestSyncWatchers(t *testing.T) { // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer func() { s.store.Close() @@ -259,7 +260,7 @@ func TestWatchCompacted(t *testing.T) { func TestWatchFutureRev(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer func() { s.store.Close() @@ -300,7 +301,7 @@ func TestWatchRestore(t *testing.T) { test := func(delay time.Duration) func(t *testing.T) { return func(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) testKey := []byte("foo") @@ -308,7 +309,7 @@ func TestWatchRestore(t *testing.T) { rev := s.Put(testKey, testValue, lease.NoLease) newBackend, newPath := backend.NewDefaultTmpBackend() - newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil) + newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil) defer cleanup(newStore, newBackend, newPath) w := newStore.NewWatchStream() @@ -341,7 +342,7 @@ func TestWatchRestore(t *testing.T) { // TestWatchBatchUnsynced tests batching on unsynced watchers func TestWatchBatchUnsynced(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) oldMaxRevs := watchBatchMaxRevs defer func() { @@ -475,7 +476,7 @@ func TestWatchVictims(t *testing.T) { oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer func() { s.store.Close() @@ -553,7 +554,7 @@ func TestWatchVictims(t *testing.T) { // canceling its watches. func TestStressWatchCancelClose(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer func() { s.store.Close() diff --git a/mvcc/watcher_bench_test.go b/mvcc/watcher_bench_test.go index 86cbea7df..c8244f65b 100644 --- a/mvcc/watcher_bench_test.go +++ b/mvcc/watcher_bench_test.go @@ -20,11 +20,13 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" + + "go.uber.org/zap" ) func BenchmarkKVWatcherMemoryUsage(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - watchable := newWatchableStore(be, &lease.FakeLessor{}, nil) + watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil) defer cleanup(watchable, be, tmpPath) diff --git a/mvcc/watcher_test.go b/mvcc/watcher_test.go index ad5b54d7a..948158424 100644 --- a/mvcc/watcher_test.go +++ b/mvcc/watcher_test.go @@ -25,13 +25,14 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" + "go.uber.org/zap" ) // TestWatcherWatchID tests that each watcher provides unique watchID, // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -81,7 +82,7 @@ func TestWatcherWatchID(t *testing.T) { func TestWatcherRequestsCustomID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -118,7 +119,7 @@ func TestWatcherRequestsCustomID(t *testing.T) { // and returns events with matching prefixes. func TestWatcherWatchPrefix(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -192,7 +193,7 @@ func TestWatcherWatchPrefix(t *testing.T) { // does not create watcher, which panics when canceling in range tree. func TestWatcherWatchWrongRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -212,7 +213,7 @@ func TestWatcherWatchWrongRange(t *testing.T) { func TestWatchDeleteRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer func() { s.store.Close() @@ -251,7 +252,7 @@ func TestWatchDeleteRange(t *testing.T) { // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -294,7 +295,7 @@ func TestWatcherRequestProgress(t *testing.T) { // method to sync watchers in unsynced map. We want to keep watchers // in unsynced to test if syncWatchers works as expected. s := &watchableStore{ - store: NewStore(b, &lease.FakeLessor{}, nil), + store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil), unsynced: newWatcherGroup(), synced: newWatcherGroup(), } @@ -343,7 +344,7 @@ func TestWatcherRequestProgress(t *testing.T) { func TestWatcherWatchWithFilter(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream()