mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
mvcc: use capnslog
This commit is contained in:
parent
4882330fd7
commit
bf8cf39daf
@ -19,7 +19,6 @@ import (
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
@ -27,6 +26,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -39,6 +39,8 @@ var (
|
||||
// the potential max db size can prevent writer from blocking reader.
|
||||
// This only works for linux.
|
||||
InitialMmapSize = int64(10 * 1024 * 1024 * 1024)
|
||||
|
||||
plog = capnslog.NewPackageLogger("github.com/coreos/etcd/mvcc", "backend")
|
||||
)
|
||||
|
||||
const (
|
||||
@ -101,7 +103,7 @@ func NewDefaultBackend(path string) Backend {
|
||||
func newBackend(path string, d time.Duration, limit int) *backend {
|
||||
db, err := bolt.Open(path, 0600, boltOpenOptions)
|
||||
if err != nil {
|
||||
log.Panicf("backend: cannot open database at %s (%v)", path, err)
|
||||
plog.Panicf("cannot open database at %s (%v)", path, err)
|
||||
}
|
||||
|
||||
b := &backend{
|
||||
@ -137,7 +139,7 @@ func (b *backend) Snapshot() Snapshot {
|
||||
defer b.mu.RUnlock()
|
||||
tx, err := b.db.Begin(false)
|
||||
if err != nil {
|
||||
log.Fatalf("backend: cannot begin tx (%s)", err)
|
||||
plog.Fatalf("cannot begin tx (%s)", err)
|
||||
}
|
||||
return &snapshot{tx}
|
||||
}
|
||||
@ -244,24 +246,24 @@ func (b *backend) defrag() error {
|
||||
|
||||
err = b.db.Close()
|
||||
if err != nil {
|
||||
log.Fatalf("backend: cannot close database (%s)", err)
|
||||
plog.Fatalf("cannot close database (%s)", err)
|
||||
}
|
||||
err = tmpdb.Close()
|
||||
if err != nil {
|
||||
log.Fatalf("backend: cannot close database (%s)", err)
|
||||
plog.Fatalf("cannot close database (%s)", err)
|
||||
}
|
||||
err = os.Rename(tdbp, dbp)
|
||||
if err != nil {
|
||||
log.Fatalf("backend: cannot rename database (%s)", err)
|
||||
plog.Fatalf("cannot rename database (%s)", err)
|
||||
}
|
||||
|
||||
b.db, err = bolt.Open(dbp, 0600, boltOpenOptions)
|
||||
if err != nil {
|
||||
log.Panicf("backend: cannot open database at %s (%v)", dbp, err)
|
||||
plog.Panicf("cannot open database at %s (%v)", dbp, err)
|
||||
}
|
||||
b.batchTx.tx, err = b.db.Begin(true)
|
||||
if err != nil {
|
||||
log.Fatalf("backend: cannot begin tx (%s)", err)
|
||||
plog.Fatalf("cannot begin tx (%s)", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -320,7 +322,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
|
||||
func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
|
||||
dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
plog.Fatal(err)
|
||||
}
|
||||
tmpPath := path.Join(dir, "database")
|
||||
return newBackend(tmpPath, batchInterval, batchLimit), tmpPath
|
||||
|
@ -16,7 +16,6 @@ package backend
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"log"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -53,7 +52,7 @@ func newBatchTx(backend *backend) *batchTx {
|
||||
func (t *batchTx) UnsafeCreateBucket(name []byte) {
|
||||
_, err := t.tx.CreateBucket(name)
|
||||
if err != nil && err != bolt.ErrBucketExists {
|
||||
log.Fatalf("mvcc: cannot create bucket %s (%v)", name, err)
|
||||
plog.Fatalf("cannot create bucket %s (%v)", name, err)
|
||||
}
|
||||
t.pending++
|
||||
}
|
||||
@ -71,7 +70,7 @@ func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
|
||||
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
|
||||
bucket := t.tx.Bucket(bucketName)
|
||||
if bucket == nil {
|
||||
log.Fatalf("mvcc: bucket %s does not exist", bucketName)
|
||||
plog.Fatalf("bucket %s does not exist", bucketName)
|
||||
}
|
||||
if seq {
|
||||
// it is useful to increase fill percent when the workloads are mostly append-only.
|
||||
@ -79,7 +78,7 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
|
||||
bucket.FillPercent = 0.9
|
||||
}
|
||||
if err := bucket.Put(key, value); err != nil {
|
||||
log.Fatalf("mvcc: cannot put key into bucket (%v)", err)
|
||||
plog.Fatalf("cannot put key into bucket (%v)", err)
|
||||
}
|
||||
t.pending++
|
||||
}
|
||||
@ -88,7 +87,7 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
|
||||
func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
|
||||
bucket := t.tx.Bucket(bucketName)
|
||||
if bucket == nil {
|
||||
log.Fatalf("mvcc: bucket %s does not exist", bucketName)
|
||||
plog.Fatalf("bucket %s does not exist", bucketName)
|
||||
}
|
||||
|
||||
if len(endKey) == 0 {
|
||||
@ -115,11 +114,11 @@ func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64
|
||||
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
|
||||
bucket := t.tx.Bucket(bucketName)
|
||||
if bucket == nil {
|
||||
log.Fatalf("mvcc: bucket %s does not exist", bucketName)
|
||||
plog.Fatalf("bucket %s does not exist", bucketName)
|
||||
}
|
||||
err := bucket.Delete(key)
|
||||
if err != nil {
|
||||
log.Fatalf("mvcc: cannot delete key from bucket (%v)", err)
|
||||
plog.Fatalf("cannot delete key from bucket (%v)", err)
|
||||
}
|
||||
t.pending++
|
||||
}
|
||||
@ -173,7 +172,7 @@ func (t *batchTx) commit(stop bool) {
|
||||
|
||||
t.pending = 0
|
||||
if err != nil {
|
||||
log.Fatalf("mvcc: cannot commit tx (%s)", err)
|
||||
plog.Fatalf("cannot commit tx (%s)", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -186,7 +185,7 @@ func (t *batchTx) commit(stop bool) {
|
||||
// begin a new tx
|
||||
t.tx, err = t.backend.db.Begin(true)
|
||||
if err != nil {
|
||||
log.Fatalf("mvcc: cannot begin tx (%s)", err)
|
||||
plog.Fatalf("cannot begin tx (%s)", err)
|
||||
}
|
||||
atomic.StoreInt64(&t.backend.size, t.tx.Size())
|
||||
}
|
||||
|
@ -15,7 +15,6 @@
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
@ -169,7 +168,7 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
|
||||
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
|
||||
available := make(map[revision]struct{})
|
||||
var emptyki []*keyIndex
|
||||
log.Printf("store.index: compact %d", rev)
|
||||
plog.Printf("store.index: compact %d", rev)
|
||||
// TODO: do not hold the lock for long time?
|
||||
// This is probably OK. Compacting 10M keys takes O(10ms).
|
||||
ti.Lock()
|
||||
@ -178,7 +177,7 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
|
||||
for _, ki := range emptyki {
|
||||
item := ti.tree.Delete(ki)
|
||||
if item == nil {
|
||||
log.Panic("store.index: unexpected delete failure during compaction")
|
||||
plog.Panic("store.index: unexpected delete failure during compaction")
|
||||
}
|
||||
}
|
||||
return available
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/google/btree"
|
||||
)
|
||||
@ -78,7 +77,7 @@ func (ki *keyIndex) put(main int64, sub int64) {
|
||||
rev := revision{main: main, sub: sub}
|
||||
|
||||
if !rev.GreaterThan(ki.modified) {
|
||||
log.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
|
||||
plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
|
||||
}
|
||||
if len(ki.generations) == 0 {
|
||||
ki.generations = append(ki.generations, generation{})
|
||||
@ -95,7 +94,7 @@ func (ki *keyIndex) put(main int64, sub int64) {
|
||||
|
||||
func (ki *keyIndex) restore(created, modified revision, ver int64) {
|
||||
if len(ki.generations) != 0 {
|
||||
log.Panicf("store.keyindex: cannot restore non-empty keyIndex")
|
||||
plog.Panicf("store.keyindex: cannot restore non-empty keyIndex")
|
||||
}
|
||||
|
||||
ki.modified = modified
|
||||
@ -109,7 +108,7 @@ func (ki *keyIndex) restore(created, modified revision, ver int64) {
|
||||
// It returns ErrRevisionNotFound when tombstone on an empty generation.
|
||||
func (ki *keyIndex) tombstone(main int64, sub int64) error {
|
||||
if ki.isEmpty() {
|
||||
log.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
|
||||
plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
|
||||
}
|
||||
if ki.generations[len(ki.generations)-1].isEmpty() {
|
||||
return ErrRevisionNotFound
|
||||
@ -124,7 +123,7 @@ func (ki *keyIndex) tombstone(main int64, sub int64) error {
|
||||
// Rev must be higher than or equal to the given atRev.
|
||||
func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err error) {
|
||||
if ki.isEmpty() {
|
||||
log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
|
||||
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
|
||||
}
|
||||
g := ki.findGeneration(atRev)
|
||||
if g.isEmpty() {
|
||||
@ -144,7 +143,7 @@ func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err
|
||||
// main revision.
|
||||
func (ki *keyIndex) since(rev int64) []revision {
|
||||
if ki.isEmpty() {
|
||||
log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
|
||||
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
|
||||
}
|
||||
since := revision{rev, 0}
|
||||
var gi int
|
||||
@ -185,7 +184,7 @@ func (ki *keyIndex) since(rev int64) []revision {
|
||||
// If a generation becomes empty during compaction, it will be removed.
|
||||
func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
|
||||
if ki.isEmpty() {
|
||||
log.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
|
||||
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
|
||||
}
|
||||
|
||||
// walk until reaching the first revision that has an revision smaller or equal to
|
||||
|
@ -17,7 +17,6 @@ package mvcc
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"log"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sync"
|
||||
@ -27,6 +26,7 @@ import (
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
"github.com/coreos/etcd/pkg/schedule"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
@ -49,6 +49,8 @@ var (
|
||||
ErrCompacted = errors.New("mvcc: required revision has been compacted")
|
||||
ErrFutureRev = errors.New("mvcc: required revision is a future revision")
|
||||
ErrCanceled = errors.New("mvcc: watcher is canceled")
|
||||
|
||||
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
|
||||
)
|
||||
|
||||
// ConsistentIndexGetter is an interface that wraps the Get method.
|
||||
@ -341,7 +343,7 @@ func (s *store) restore() error {
|
||||
_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
|
||||
if len(finishedCompactBytes) != 0 {
|
||||
s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
|
||||
log.Printf("mvcc: restore compact to %d", s.compactMainRev)
|
||||
plog.Printf("restore compact to %d", s.compactMainRev)
|
||||
}
|
||||
|
||||
// TODO: limit N to reduce max memory usage
|
||||
@ -349,7 +351,7 @@ func (s *store) restore() error {
|
||||
for i, key := range keys {
|
||||
var kv mvccpb.KeyValue
|
||||
if err := kv.Unmarshal(vals[i]); err != nil {
|
||||
log.Fatalf("mvcc: cannot unmarshal event: %v", err)
|
||||
plog.Fatalf("cannot unmarshal event: %v", err)
|
||||
}
|
||||
|
||||
rev := bytesToRev(key[:revBytesLen])
|
||||
@ -361,7 +363,7 @@ func (s *store) restore() error {
|
||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
||||
err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
||||
if err != nil && err != lease.ErrLeaseNotFound {
|
||||
log.Fatalf("mvcc: unexpected Detach error %v", err)
|
||||
plog.Fatalf("unexpected Detach error %v", err)
|
||||
}
|
||||
}
|
||||
default:
|
||||
@ -398,7 +400,7 @@ func (s *store) restore() error {
|
||||
|
||||
if scheduledCompact != 0 {
|
||||
s.Compact(scheduledCompact)
|
||||
log.Printf("mvcc: resume scheduled compaction at %d", scheduledCompact)
|
||||
plog.Printf("resume scheduled compaction at %d", scheduledCompact)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -450,12 +452,12 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []mvccpb.
|
||||
|
||||
_, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0)
|
||||
if len(vs) != 1 {
|
||||
log.Fatalf("mvcc: range cannot find rev (%d,%d)", revpair.main, revpair.sub)
|
||||
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
|
||||
}
|
||||
|
||||
var kv mvccpb.KeyValue
|
||||
if err := kv.Unmarshal(vs[0]); err != nil {
|
||||
log.Fatalf("mvcc: cannot unmarshal event: %v", err)
|
||||
plog.Fatalf("cannot unmarshal event: %v", err)
|
||||
}
|
||||
kvs = append(kvs, kv)
|
||||
if limit > 0 && len(kvs) >= int(limit) {
|
||||
@ -480,7 +482,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
|
||||
_, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
|
||||
var kv mvccpb.KeyValue
|
||||
if err = kv.Unmarshal(vs[0]); err != nil {
|
||||
log.Fatalf("mvcc: cannot unmarshal value: %v", err)
|
||||
plog.Fatalf("cannot unmarshal value: %v", err)
|
||||
}
|
||||
oldLease = lease.LeaseID(kv.Lease)
|
||||
}
|
||||
@ -500,7 +502,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
|
||||
|
||||
d, err := kv.Marshal()
|
||||
if err != nil {
|
||||
log.Fatalf("mvcc: cannot marshal event: %v", err)
|
||||
plog.Fatalf("cannot marshal event: %v", err)
|
||||
}
|
||||
|
||||
s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
|
||||
@ -561,13 +563,13 @@ func (s *store) delete(key []byte, rev revision) {
|
||||
|
||||
d, err := kv.Marshal()
|
||||
if err != nil {
|
||||
log.Fatalf("mvcc: cannot marshal event: %v", err)
|
||||
plog.Fatalf("cannot marshal event: %v", err)
|
||||
}
|
||||
|
||||
s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
|
||||
err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub})
|
||||
if err != nil {
|
||||
log.Fatalf("mvcc: cannot tombstone an existing key (%s): %v", string(key), err)
|
||||
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
|
||||
}
|
||||
s.changes = append(s.changes, kv)
|
||||
s.currentRev.sub += 1
|
||||
@ -578,13 +580,13 @@ func (s *store) delete(key []byte, rev revision) {
|
||||
|
||||
kv.Reset()
|
||||
if err = kv.Unmarshal(vs[0]); err != nil {
|
||||
log.Fatalf("mvcc: cannot unmarshal value: %v", err)
|
||||
plog.Fatalf("cannot unmarshal value: %v", err)
|
||||
}
|
||||
|
||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
||||
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
||||
if err != nil {
|
||||
log.Fatalf("mvcc: cannot detach %v", err)
|
||||
plog.Fatalf("cannot detach %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -622,7 +624,7 @@ func (s *store) ConsistentIndex() uint64 {
|
||||
// appendMarkTombstone appends tombstone mark to normal revision bytes.
|
||||
func appendMarkTombstone(b []byte) []byte {
|
||||
if len(b) != revBytesLen {
|
||||
log.Panicf("cannot append mark to non normal revision bytes")
|
||||
plog.Panicf("cannot append mark to non normal revision bytes")
|
||||
}
|
||||
return append(b, markTombstone)
|
||||
}
|
||||
|
@ -15,7 +15,6 @@
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
@ -64,7 +63,7 @@ func BenchmarkStoreTxnPut(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
id := s.TxnBegin()
|
||||
if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil {
|
||||
log.Fatalf("txn put error: %v", err)
|
||||
plog.Fatalf("txn put error: %v", err)
|
||||
}
|
||||
s.TxnEnd(id)
|
||||
}
|
||||
|
@ -48,6 +48,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
||||
revToBytes(revision{main: compactMainRev}, rbytes)
|
||||
tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
|
||||
tx.Unlock()
|
||||
plog.Printf("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,6 @@ package mvcc
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"log"
|
||||
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
@ -48,7 +47,7 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
|
||||
|
||||
d, err := kv.Marshal()
|
||||
if err != nil {
|
||||
log.Fatalf("mvcc: cannot marshal event: %v", err)
|
||||
plog.Fatalf("cannot marshal event: %v", err)
|
||||
}
|
||||
|
||||
be.BatchTx().Lock()
|
||||
|
@ -15,7 +15,6 @@
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -94,7 +93,7 @@ func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64)
|
||||
rev = s.store.Put(key, value, lease)
|
||||
changes := s.store.getChanges()
|
||||
if len(changes) != 1 {
|
||||
log.Panicf("unexpected len(changes) != 1 after put")
|
||||
plog.Panicf("unexpected len(changes) != 1 after put")
|
||||
}
|
||||
|
||||
ev := mvccpb.Event{
|
||||
@ -113,7 +112,7 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
|
||||
changes := s.store.getChanges()
|
||||
|
||||
if len(changes) != int(n) {
|
||||
log.Panicf("unexpected len(changes) != n after deleteRange")
|
||||
plog.Panicf("unexpected len(changes) != n after deleteRange")
|
||||
}
|
||||
|
||||
if n == 0 {
|
||||
@ -432,7 +431,7 @@ func kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
|
||||
for i, v := range vals {
|
||||
var kv mvccpb.KeyValue
|
||||
if err := kv.Unmarshal(v); err != nil {
|
||||
log.Panicf("mvcc: cannot unmarshal event: %v", err)
|
||||
plog.Panicf("cannot unmarshal event: %v", err)
|
||||
}
|
||||
|
||||
if !wg.contains(string(kv.Key)) {
|
||||
@ -456,7 +455,7 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
|
||||
var victim watcherBatch
|
||||
for w, eb := range newWatcherBatch(&s.synced, evs) {
|
||||
if eb.revs != 1 {
|
||||
log.Panicf("mvcc: unexpected multiple revisions in notification")
|
||||
plog.Panicf("unexpected multiple revisions in notification")
|
||||
}
|
||||
select {
|
||||
case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.Rev()}:
|
||||
|
Loading…
x
Reference in New Issue
Block a user