diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index fda35d729..2b48cd3e2 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -19,7 +19,6 @@ import ( "hash/crc32" "io" "io/ioutil" - "log" "os" "path" "sync" @@ -27,6 +26,7 @@ import ( "time" "github.com/boltdb/bolt" + "github.com/coreos/pkg/capnslog" ) var ( @@ -39,6 +39,8 @@ var ( // the potential max db size can prevent writer from blocking reader. // This only works for linux. InitialMmapSize = int64(10 * 1024 * 1024 * 1024) + + plog = capnslog.NewPackageLogger("github.com/coreos/etcd/mvcc", "backend") ) const ( @@ -101,7 +103,7 @@ func NewDefaultBackend(path string) Backend { func newBackend(path string, d time.Duration, limit int) *backend { db, err := bolt.Open(path, 0600, boltOpenOptions) if err != nil { - log.Panicf("backend: cannot open database at %s (%v)", path, err) + plog.Panicf("cannot open database at %s (%v)", path, err) } b := &backend{ @@ -137,7 +139,7 @@ func (b *backend) Snapshot() Snapshot { defer b.mu.RUnlock() tx, err := b.db.Begin(false) if err != nil { - log.Fatalf("backend: cannot begin tx (%s)", err) + plog.Fatalf("cannot begin tx (%s)", err) } return &snapshot{tx} } @@ -244,24 +246,24 @@ func (b *backend) defrag() error { err = b.db.Close() if err != nil { - log.Fatalf("backend: cannot close database (%s)", err) + plog.Fatalf("cannot close database (%s)", err) } err = tmpdb.Close() if err != nil { - log.Fatalf("backend: cannot close database (%s)", err) + plog.Fatalf("cannot close database (%s)", err) } err = os.Rename(tdbp, dbp) if err != nil { - log.Fatalf("backend: cannot rename database (%s)", err) + plog.Fatalf("cannot rename database (%s)", err) } b.db, err = bolt.Open(dbp, 0600, boltOpenOptions) if err != nil { - log.Panicf("backend: cannot open database at %s (%v)", dbp, err) + plog.Panicf("cannot open database at %s (%v)", dbp, err) } b.batchTx.tx, err = b.db.Begin(true) if err != nil { - log.Fatalf("backend: cannot begin tx (%s)", err) + plog.Fatalf("cannot begin tx (%s)", err) } return nil @@ -320,7 +322,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) { dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test") if err != nil { - log.Fatal(err) + plog.Fatal(err) } tmpPath := path.Join(dir, "database") return newBackend(tmpPath, batchInterval, batchLimit), tmpPath diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index 057b011e5..e1cdcc65a 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -16,7 +16,6 @@ package backend import ( "bytes" - "log" "sync" "sync/atomic" "time" @@ -53,7 +52,7 @@ func newBatchTx(backend *backend) *batchTx { func (t *batchTx) UnsafeCreateBucket(name []byte) { _, err := t.tx.CreateBucket(name) if err != nil && err != bolt.ErrBucketExists { - log.Fatalf("mvcc: cannot create bucket %s (%v)", name, err) + plog.Fatalf("cannot create bucket %s (%v)", name, err) } t.pending++ } @@ -71,7 +70,7 @@ func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) { func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) { bucket := t.tx.Bucket(bucketName) if bucket == nil { - log.Fatalf("mvcc: bucket %s does not exist", bucketName) + plog.Fatalf("bucket %s does not exist", bucketName) } if seq { // it is useful to increase fill percent when the workloads are mostly append-only. @@ -79,7 +78,7 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo bucket.FillPercent = 0.9 } if err := bucket.Put(key, value); err != nil { - log.Fatalf("mvcc: cannot put key into bucket (%v)", err) + plog.Fatalf("cannot put key into bucket (%v)", err) } t.pending++ } @@ -88,7 +87,7 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) { bucket := t.tx.Bucket(bucketName) if bucket == nil { - log.Fatalf("mvcc: bucket %s does not exist", bucketName) + plog.Fatalf("bucket %s does not exist", bucketName) } if len(endKey) == 0 { @@ -115,11 +114,11 @@ func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64 func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { bucket := t.tx.Bucket(bucketName) if bucket == nil { - log.Fatalf("mvcc: bucket %s does not exist", bucketName) + plog.Fatalf("bucket %s does not exist", bucketName) } err := bucket.Delete(key) if err != nil { - log.Fatalf("mvcc: cannot delete key from bucket (%v)", err) + plog.Fatalf("cannot delete key from bucket (%v)", err) } t.pending++ } @@ -173,7 +172,7 @@ func (t *batchTx) commit(stop bool) { t.pending = 0 if err != nil { - log.Fatalf("mvcc: cannot commit tx (%s)", err) + plog.Fatalf("cannot commit tx (%s)", err) } } @@ -186,7 +185,7 @@ func (t *batchTx) commit(stop bool) { // begin a new tx t.tx, err = t.backend.db.Begin(true) if err != nil { - log.Fatalf("mvcc: cannot begin tx (%s)", err) + plog.Fatalf("cannot begin tx (%s)", err) } atomic.StoreInt64(&t.backend.size, t.tx.Size()) } diff --git a/mvcc/index.go b/mvcc/index.go index 568361d2a..e8e489b4d 100644 --- a/mvcc/index.go +++ b/mvcc/index.go @@ -15,7 +15,6 @@ package mvcc import ( - "log" "sort" "sync" @@ -169,7 +168,7 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision { func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { available := make(map[revision]struct{}) var emptyki []*keyIndex - log.Printf("store.index: compact %d", rev) + plog.Printf("store.index: compact %d", rev) // TODO: do not hold the lock for long time? // This is probably OK. Compacting 10M keys takes O(10ms). ti.Lock() @@ -178,7 +177,7 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { for _, ki := range emptyki { item := ti.tree.Delete(ki) if item == nil { - log.Panic("store.index: unexpected delete failure during compaction") + plog.Panic("store.index: unexpected delete failure during compaction") } } return available diff --git a/mvcc/key_index.go b/mvcc/key_index.go index 8119fa403..983c64e2f 100644 --- a/mvcc/key_index.go +++ b/mvcc/key_index.go @@ -18,7 +18,6 @@ import ( "bytes" "errors" "fmt" - "log" "github.com/google/btree" ) @@ -78,7 +77,7 @@ func (ki *keyIndex) put(main int64, sub int64) { rev := revision{main: main, sub: sub} if !rev.GreaterThan(ki.modified) { - log.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified) + plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified) } if len(ki.generations) == 0 { ki.generations = append(ki.generations, generation{}) @@ -95,7 +94,7 @@ func (ki *keyIndex) put(main int64, sub int64) { func (ki *keyIndex) restore(created, modified revision, ver int64) { if len(ki.generations) != 0 { - log.Panicf("store.keyindex: cannot restore non-empty keyIndex") + plog.Panicf("store.keyindex: cannot restore non-empty keyIndex") } ki.modified = modified @@ -109,7 +108,7 @@ func (ki *keyIndex) restore(created, modified revision, ver int64) { // It returns ErrRevisionNotFound when tombstone on an empty generation. func (ki *keyIndex) tombstone(main int64, sub int64) error { if ki.isEmpty() { - log.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key)) + plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key)) } if ki.generations[len(ki.generations)-1].isEmpty() { return ErrRevisionNotFound @@ -124,7 +123,7 @@ func (ki *keyIndex) tombstone(main int64, sub int64) error { // Rev must be higher than or equal to the given atRev. func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err error) { if ki.isEmpty() { - log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key)) + plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key)) } g := ki.findGeneration(atRev) if g.isEmpty() { @@ -144,7 +143,7 @@ func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err // main revision. func (ki *keyIndex) since(rev int64) []revision { if ki.isEmpty() { - log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key)) + plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key)) } since := revision{rev, 0} var gi int @@ -185,7 +184,7 @@ func (ki *keyIndex) since(rev int64) []revision { // If a generation becomes empty during compaction, it will be removed. func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) { if ki.isEmpty() { - log.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key)) + plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key)) } // walk until reaching the first revision that has an revision smaller or equal to diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 01aca0708..036950af5 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -17,7 +17,6 @@ package mvcc import ( "encoding/binary" "errors" - "log" "math" "math/rand" "sync" @@ -27,6 +26,7 @@ import ( "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/pkg/schedule" + "github.com/coreos/pkg/capnslog" "golang.org/x/net/context" ) @@ -49,6 +49,8 @@ var ( ErrCompacted = errors.New("mvcc: required revision has been compacted") ErrFutureRev = errors.New("mvcc: required revision is a future revision") ErrCanceled = errors.New("mvcc: watcher is canceled") + + plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc") ) // ConsistentIndexGetter is an interface that wraps the Get method. @@ -341,7 +343,7 @@ func (s *store) restore() error { _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0) if len(finishedCompactBytes) != 0 { s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main - log.Printf("mvcc: restore compact to %d", s.compactMainRev) + plog.Printf("restore compact to %d", s.compactMainRev) } // TODO: limit N to reduce max memory usage @@ -349,7 +351,7 @@ func (s *store) restore() error { for i, key := range keys { var kv mvccpb.KeyValue if err := kv.Unmarshal(vals[i]); err != nil { - log.Fatalf("mvcc: cannot unmarshal event: %v", err) + plog.Fatalf("cannot unmarshal event: %v", err) } rev := bytesToRev(key[:revBytesLen]) @@ -361,7 +363,7 @@ func (s *store) restore() error { if lease.LeaseID(kv.Lease) != lease.NoLease { err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}}) if err != nil && err != lease.ErrLeaseNotFound { - log.Fatalf("mvcc: unexpected Detach error %v", err) + plog.Fatalf("unexpected Detach error %v", err) } } default: @@ -398,7 +400,7 @@ func (s *store) restore() error { if scheduledCompact != 0 { s.Compact(scheduledCompact) - log.Printf("mvcc: resume scheduled compaction at %d", scheduledCompact) + plog.Printf("resume scheduled compaction at %d", scheduledCompact) } return nil @@ -450,12 +452,12 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []mvccpb. _, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0) if len(vs) != 1 { - log.Fatalf("mvcc: range cannot find rev (%d,%d)", revpair.main, revpair.sub) + plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub) } var kv mvccpb.KeyValue if err := kv.Unmarshal(vs[0]); err != nil { - log.Fatalf("mvcc: cannot unmarshal event: %v", err) + plog.Fatalf("cannot unmarshal event: %v", err) } kvs = append(kvs, kv) if limit > 0 && len(kvs) >= int(limit) { @@ -480,7 +482,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) { _, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0) var kv mvccpb.KeyValue if err = kv.Unmarshal(vs[0]); err != nil { - log.Fatalf("mvcc: cannot unmarshal value: %v", err) + plog.Fatalf("cannot unmarshal value: %v", err) } oldLease = lease.LeaseID(kv.Lease) } @@ -500,7 +502,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) { d, err := kv.Marshal() if err != nil { - log.Fatalf("mvcc: cannot marshal event: %v", err) + plog.Fatalf("cannot marshal event: %v", err) } s.tx.UnsafeSeqPut(keyBucketName, ibytes, d) @@ -561,13 +563,13 @@ func (s *store) delete(key []byte, rev revision) { d, err := kv.Marshal() if err != nil { - log.Fatalf("mvcc: cannot marshal event: %v", err) + plog.Fatalf("cannot marshal event: %v", err) } s.tx.UnsafeSeqPut(keyBucketName, ibytes, d) err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub}) if err != nil { - log.Fatalf("mvcc: cannot tombstone an existing key (%s): %v", string(key), err) + plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err) } s.changes = append(s.changes, kv) s.currentRev.sub += 1 @@ -578,13 +580,13 @@ func (s *store) delete(key []byte, rev revision) { kv.Reset() if err = kv.Unmarshal(vs[0]); err != nil { - log.Fatalf("mvcc: cannot unmarshal value: %v", err) + plog.Fatalf("cannot unmarshal value: %v", err) } if lease.LeaseID(kv.Lease) != lease.NoLease { err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}}) if err != nil { - log.Fatalf("mvcc: cannot detach %v", err) + plog.Fatalf("cannot detach %v", err) } } } @@ -622,7 +624,7 @@ func (s *store) ConsistentIndex() uint64 { // appendMarkTombstone appends tombstone mark to normal revision bytes. func appendMarkTombstone(b []byte) []byte { if len(b) != revBytesLen { - log.Panicf("cannot append mark to non normal revision bytes") + plog.Panicf("cannot append mark to non normal revision bytes") } return append(b, markTombstone) } diff --git a/mvcc/kvstore_bench_test.go b/mvcc/kvstore_bench_test.go index e1d7ecdd4..bf3752475 100644 --- a/mvcc/kvstore_bench_test.go +++ b/mvcc/kvstore_bench_test.go @@ -15,7 +15,6 @@ package mvcc import ( - "log" "sync/atomic" "testing" @@ -64,7 +63,7 @@ func BenchmarkStoreTxnPut(b *testing.B) { for i := 0; i < b.N; i++ { id := s.TxnBegin() if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil { - log.Fatalf("txn put error: %v", err) + plog.Fatalf("txn put error: %v", err) } s.TxnEnd(id) } diff --git a/mvcc/kvstore_compaction.go b/mvcc/kvstore_compaction.go index 70b4a388f..bbd38f547 100644 --- a/mvcc/kvstore_compaction.go +++ b/mvcc/kvstore_compaction.go @@ -48,6 +48,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc revToBytes(revision{main: compactMainRev}, rbytes) tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes) tx.Unlock() + plog.Printf("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart)) return true } diff --git a/mvcc/util.go b/mvcc/util.go index c2d1a3146..8a0df0bfc 100644 --- a/mvcc/util.go +++ b/mvcc/util.go @@ -16,7 +16,6 @@ package mvcc import ( "encoding/binary" - "log" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" @@ -48,7 +47,7 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) { d, err := kv.Marshal() if err != nil { - log.Fatalf("mvcc: cannot marshal event: %v", err) + plog.Fatalf("cannot marshal event: %v", err) } be.BatchTx().Lock() diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index e32abf9e0..25c84ce99 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -15,7 +15,6 @@ package mvcc import ( - "log" "sync" "time" @@ -94,7 +93,7 @@ func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) rev = s.store.Put(key, value, lease) changes := s.store.getChanges() if len(changes) != 1 { - log.Panicf("unexpected len(changes) != 1 after put") + plog.Panicf("unexpected len(changes) != 1 after put") } ev := mvccpb.Event{ @@ -113,7 +112,7 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { changes := s.store.getChanges() if len(changes) != int(n) { - log.Panicf("unexpected len(changes) != n after deleteRange") + plog.Panicf("unexpected len(changes) != n after deleteRange") } if n == 0 { @@ -432,7 +431,7 @@ func kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) { for i, v := range vals { var kv mvccpb.KeyValue if err := kv.Unmarshal(v); err != nil { - log.Panicf("mvcc: cannot unmarshal event: %v", err) + plog.Panicf("cannot unmarshal event: %v", err) } if !wg.contains(string(kv.Key)) { @@ -456,7 +455,7 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { var victim watcherBatch for w, eb := range newWatcherBatch(&s.synced, evs) { if eb.revs != 1 { - log.Panicf("mvcc: unexpected multiple revisions in notification") + plog.Panicf("unexpected multiple revisions in notification") } select { case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.Rev()}: