Merge pull request #5426 from gyuho/log_compaction_done

mvcc: log when compaction is done
This commit is contained in:
Gyu-Ho Lee 2016-05-21 09:33:50 -07:00
commit f20573b576
9 changed files with 50 additions and 51 deletions

View File

@ -19,7 +19,6 @@ import (
"hash/crc32"
"io"
"io/ioutil"
"log"
"os"
"path"
"sync"
@ -27,6 +26,7 @@ import (
"time"
"github.com/boltdb/bolt"
"github.com/coreos/pkg/capnslog"
)
var (
@ -39,6 +39,8 @@ var (
// the potential max db size can prevent writer from blocking reader.
// This only works for linux.
InitialMmapSize = int64(10 * 1024 * 1024 * 1024)
plog = capnslog.NewPackageLogger("github.com/coreos/etcd/mvcc", "backend")
)
const (
@ -101,7 +103,7 @@ func NewDefaultBackend(path string) Backend {
func newBackend(path string, d time.Duration, limit int) *backend {
db, err := bolt.Open(path, 0600, boltOpenOptions)
if err != nil {
log.Panicf("backend: cannot open database at %s (%v)", path, err)
plog.Panicf("cannot open database at %s (%v)", path, err)
}
b := &backend{
@ -137,7 +139,7 @@ func (b *backend) Snapshot() Snapshot {
defer b.mu.RUnlock()
tx, err := b.db.Begin(false)
if err != nil {
log.Fatalf("backend: cannot begin tx (%s)", err)
plog.Fatalf("cannot begin tx (%s)", err)
}
return &snapshot{tx}
}
@ -244,24 +246,24 @@ func (b *backend) defrag() error {
err = b.db.Close()
if err != nil {
log.Fatalf("backend: cannot close database (%s)", err)
plog.Fatalf("cannot close database (%s)", err)
}
err = tmpdb.Close()
if err != nil {
log.Fatalf("backend: cannot close database (%s)", err)
plog.Fatalf("cannot close database (%s)", err)
}
err = os.Rename(tdbp, dbp)
if err != nil {
log.Fatalf("backend: cannot rename database (%s)", err)
plog.Fatalf("cannot rename database (%s)", err)
}
b.db, err = bolt.Open(dbp, 0600, boltOpenOptions)
if err != nil {
log.Panicf("backend: cannot open database at %s (%v)", dbp, err)
plog.Panicf("cannot open database at %s (%v)", dbp, err)
}
b.batchTx.tx, err = b.db.Begin(true)
if err != nil {
log.Fatalf("backend: cannot begin tx (%s)", err)
plog.Fatalf("cannot begin tx (%s)", err)
}
return nil
@ -320,7 +322,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
if err != nil {
log.Fatal(err)
plog.Fatal(err)
}
tmpPath := path.Join(dir, "database")
return newBackend(tmpPath, batchInterval, batchLimit), tmpPath

View File

@ -16,7 +16,6 @@ package backend
import (
"bytes"
"log"
"sync"
"sync/atomic"
"time"
@ -53,7 +52,7 @@ func newBatchTx(backend *backend) *batchTx {
func (t *batchTx) UnsafeCreateBucket(name []byte) {
_, err := t.tx.CreateBucket(name)
if err != nil && err != bolt.ErrBucketExists {
log.Fatalf("mvcc: cannot create bucket %s (%v)", name, err)
plog.Fatalf("cannot create bucket %s (%v)", name, err)
}
t.pending++
}
@ -71,7 +70,7 @@ func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
log.Fatalf("mvcc: bucket %s does not exist", bucketName)
plog.Fatalf("bucket %s does not exist", bucketName)
}
if seq {
// it is useful to increase fill percent when the workloads are mostly append-only.
@ -79,7 +78,7 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
bucket.FillPercent = 0.9
}
if err := bucket.Put(key, value); err != nil {
log.Fatalf("mvcc: cannot put key into bucket (%v)", err)
plog.Fatalf("cannot put key into bucket (%v)", err)
}
t.pending++
}
@ -88,7 +87,7 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
log.Fatalf("mvcc: bucket %s does not exist", bucketName)
plog.Fatalf("bucket %s does not exist", bucketName)
}
if len(endKey) == 0 {
@ -115,11 +114,11 @@ func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
log.Fatalf("mvcc: bucket %s does not exist", bucketName)
plog.Fatalf("bucket %s does not exist", bucketName)
}
err := bucket.Delete(key)
if err != nil {
log.Fatalf("mvcc: cannot delete key from bucket (%v)", err)
plog.Fatalf("cannot delete key from bucket (%v)", err)
}
t.pending++
}
@ -173,7 +172,7 @@ func (t *batchTx) commit(stop bool) {
t.pending = 0
if err != nil {
log.Fatalf("mvcc: cannot commit tx (%s)", err)
plog.Fatalf("cannot commit tx (%s)", err)
}
}
@ -186,7 +185,7 @@ func (t *batchTx) commit(stop bool) {
// begin a new tx
t.tx, err = t.backend.db.Begin(true)
if err != nil {
log.Fatalf("mvcc: cannot begin tx (%s)", err)
plog.Fatalf("cannot begin tx (%s)", err)
}
atomic.StoreInt64(&t.backend.size, t.tx.Size())
}

View File

@ -15,7 +15,6 @@
package mvcc
import (
"log"
"sort"
"sync"
@ -169,7 +168,7 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
var emptyki []*keyIndex
log.Printf("store.index: compact %d", rev)
plog.Printf("store.index: compact %d", rev)
// TODO: do not hold the lock for long time?
// This is probably OK. Compacting 10M keys takes O(10ms).
ti.Lock()
@ -178,7 +177,7 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
for _, ki := range emptyki {
item := ti.tree.Delete(ki)
if item == nil {
log.Panic("store.index: unexpected delete failure during compaction")
plog.Panic("store.index: unexpected delete failure during compaction")
}
}
return available

View File

@ -18,7 +18,6 @@ import (
"bytes"
"errors"
"fmt"
"log"
"github.com/google/btree"
)
@ -78,7 +77,7 @@ func (ki *keyIndex) put(main int64, sub int64) {
rev := revision{main: main, sub: sub}
if !rev.GreaterThan(ki.modified) {
log.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
}
if len(ki.generations) == 0 {
ki.generations = append(ki.generations, generation{})
@ -95,7 +94,7 @@ func (ki *keyIndex) put(main int64, sub int64) {
func (ki *keyIndex) restore(created, modified revision, ver int64) {
if len(ki.generations) != 0 {
log.Panicf("store.keyindex: cannot restore non-empty keyIndex")
plog.Panicf("store.keyindex: cannot restore non-empty keyIndex")
}
ki.modified = modified
@ -109,7 +108,7 @@ func (ki *keyIndex) restore(created, modified revision, ver int64) {
// It returns ErrRevisionNotFound when tombstone on an empty generation.
func (ki *keyIndex) tombstone(main int64, sub int64) error {
if ki.isEmpty() {
log.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
}
if ki.generations[len(ki.generations)-1].isEmpty() {
return ErrRevisionNotFound
@ -124,7 +123,7 @@ func (ki *keyIndex) tombstone(main int64, sub int64) error {
// Rev must be higher than or equal to the given atRev.
func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err error) {
if ki.isEmpty() {
log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
}
g := ki.findGeneration(atRev)
if g.isEmpty() {
@ -144,7 +143,7 @@ func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err
// main revision.
func (ki *keyIndex) since(rev int64) []revision {
if ki.isEmpty() {
log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
}
since := revision{rev, 0}
var gi int
@ -185,7 +184,7 @@ func (ki *keyIndex) since(rev int64) []revision {
// If a generation becomes empty during compaction, it will be removed.
func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
if ki.isEmpty() {
log.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
}
// walk until reaching the first revision that has an revision smaller or equal to

View File

@ -17,7 +17,6 @@ package mvcc
import (
"encoding/binary"
"errors"
"log"
"math"
"math/rand"
"sync"
@ -27,6 +26,7 @@ import (
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/schedule"
"github.com/coreos/pkg/capnslog"
"golang.org/x/net/context"
)
@ -49,6 +49,8 @@ var (
ErrCompacted = errors.New("mvcc: required revision has been compacted")
ErrFutureRev = errors.New("mvcc: required revision is a future revision")
ErrCanceled = errors.New("mvcc: watcher is canceled")
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
)
// ConsistentIndexGetter is an interface that wraps the Get method.
@ -341,7 +343,7 @@ func (s *store) restore() error {
_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
log.Printf("mvcc: restore compact to %d", s.compactMainRev)
plog.Printf("restore compact to %d", s.compactMainRev)
}
// TODO: limit N to reduce max memory usage
@ -349,7 +351,7 @@ func (s *store) restore() error {
for i, key := range keys {
var kv mvccpb.KeyValue
if err := kv.Unmarshal(vals[i]); err != nil {
log.Fatalf("mvcc: cannot unmarshal event: %v", err)
plog.Fatalf("cannot unmarshal event: %v", err)
}
rev := bytesToRev(key[:revBytesLen])
@ -361,7 +363,7 @@ func (s *store) restore() error {
if lease.LeaseID(kv.Lease) != lease.NoLease {
err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
if err != nil && err != lease.ErrLeaseNotFound {
log.Fatalf("mvcc: unexpected Detach error %v", err)
plog.Fatalf("unexpected Detach error %v", err)
}
}
default:
@ -398,7 +400,7 @@ func (s *store) restore() error {
if scheduledCompact != 0 {
s.Compact(scheduledCompact)
log.Printf("mvcc: resume scheduled compaction at %d", scheduledCompact)
plog.Printf("resume scheduled compaction at %d", scheduledCompact)
}
return nil
@ -450,12 +452,12 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []mvccpb.
_, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0)
if len(vs) != 1 {
log.Fatalf("mvcc: range cannot find rev (%d,%d)", revpair.main, revpair.sub)
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
}
var kv mvccpb.KeyValue
if err := kv.Unmarshal(vs[0]); err != nil {
log.Fatalf("mvcc: cannot unmarshal event: %v", err)
plog.Fatalf("cannot unmarshal event: %v", err)
}
kvs = append(kvs, kv)
if limit > 0 && len(kvs) >= int(limit) {
@ -480,7 +482,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
_, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
var kv mvccpb.KeyValue
if err = kv.Unmarshal(vs[0]); err != nil {
log.Fatalf("mvcc: cannot unmarshal value: %v", err)
plog.Fatalf("cannot unmarshal value: %v", err)
}
oldLease = lease.LeaseID(kv.Lease)
}
@ -500,7 +502,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
d, err := kv.Marshal()
if err != nil {
log.Fatalf("mvcc: cannot marshal event: %v", err)
plog.Fatalf("cannot marshal event: %v", err)
}
s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
@ -561,13 +563,13 @@ func (s *store) delete(key []byte, rev revision) {
d, err := kv.Marshal()
if err != nil {
log.Fatalf("mvcc: cannot marshal event: %v", err)
plog.Fatalf("cannot marshal event: %v", err)
}
s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub})
if err != nil {
log.Fatalf("mvcc: cannot tombstone an existing key (%s): %v", string(key), err)
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
}
s.changes = append(s.changes, kv)
s.currentRev.sub += 1
@ -578,13 +580,13 @@ func (s *store) delete(key []byte, rev revision) {
kv.Reset()
if err = kv.Unmarshal(vs[0]); err != nil {
log.Fatalf("mvcc: cannot unmarshal value: %v", err)
plog.Fatalf("cannot unmarshal value: %v", err)
}
if lease.LeaseID(kv.Lease) != lease.NoLease {
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
if err != nil {
log.Fatalf("mvcc: cannot detach %v", err)
plog.Fatalf("cannot detach %v", err)
}
}
}
@ -622,7 +624,7 @@ func (s *store) ConsistentIndex() uint64 {
// appendMarkTombstone appends tombstone mark to normal revision bytes.
func appendMarkTombstone(b []byte) []byte {
if len(b) != revBytesLen {
log.Panicf("cannot append mark to non normal revision bytes")
plog.Panicf("cannot append mark to non normal revision bytes")
}
return append(b, markTombstone)
}

View File

@ -15,7 +15,6 @@
package mvcc
import (
"log"
"sync/atomic"
"testing"
@ -64,7 +63,7 @@ func BenchmarkStoreTxnPut(b *testing.B) {
for i := 0; i < b.N; i++ {
id := s.TxnBegin()
if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil {
log.Fatalf("txn put error: %v", err)
plog.Fatalf("txn put error: %v", err)
}
s.TxnEnd(id)
}

View File

@ -48,6 +48,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
revToBytes(revision{main: compactMainRev}, rbytes)
tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
tx.Unlock()
plog.Printf("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
return true
}

View File

@ -16,7 +16,6 @@ package mvcc
import (
"encoding/binary"
"log"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
@ -48,7 +47,7 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
d, err := kv.Marshal()
if err != nil {
log.Fatalf("mvcc: cannot marshal event: %v", err)
plog.Fatalf("cannot marshal event: %v", err)
}
be.BatchTx().Lock()

View File

@ -15,7 +15,6 @@
package mvcc
import (
"log"
"sync"
"time"
@ -94,7 +93,7 @@ func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64)
rev = s.store.Put(key, value, lease)
changes := s.store.getChanges()
if len(changes) != 1 {
log.Panicf("unexpected len(changes) != 1 after put")
plog.Panicf("unexpected len(changes) != 1 after put")
}
ev := mvccpb.Event{
@ -113,7 +112,7 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
changes := s.store.getChanges()
if len(changes) != int(n) {
log.Panicf("unexpected len(changes) != n after deleteRange")
plog.Panicf("unexpected len(changes) != n after deleteRange")
}
if n == 0 {
@ -432,7 +431,7 @@ func kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
for i, v := range vals {
var kv mvccpb.KeyValue
if err := kv.Unmarshal(v); err != nil {
log.Panicf("mvcc: cannot unmarshal event: %v", err)
plog.Panicf("cannot unmarshal event: %v", err)
}
if !wg.contains(string(kv.Key)) {
@ -456,7 +455,7 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
var victim watcherBatch
for w, eb := range newWatcherBatch(&s.synced, evs) {
if eb.revs != 1 {
log.Panicf("mvcc: unexpected multiple revisions in notification")
plog.Panicf("unexpected multiple revisions in notification")
}
select {
case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.Rev()}: