mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #9697 from gyuho/log-log-log
*: add more structured logger
This commit is contained in:
commit
72cc355c13
@ -24,6 +24,7 @@ import (
|
|||||||
"github.com/coreos/etcd/mvcc/backend"
|
"github.com/coreos/etcd/mvcc/backend"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/raftsnap"
|
"github.com/coreos/etcd/raftsnap"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -56,10 +56,10 @@ import (
|
|||||||
"github.com/coreos/etcd/raftsnap"
|
"github.com/coreos/etcd/raftsnap"
|
||||||
"github.com/coreos/etcd/version"
|
"github.com/coreos/etcd/version"
|
||||||
"github.com/coreos/etcd/wal"
|
"github.com/coreos/etcd/wal"
|
||||||
humanize "github.com/dustin/go-humanize"
|
|
||||||
|
|
||||||
"github.com/coreos/go-semver/semver"
|
"github.com/coreos/go-semver/semver"
|
||||||
"github.com/coreos/pkg/capnslog"
|
"github.com/coreos/pkg/capnslog"
|
||||||
|
humanize "github.com/dustin/go-humanize"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -196,8 +196,12 @@ func (b *backend) Snapshot() Snapshot {
|
|||||||
defer b.mu.RUnlock()
|
defer b.mu.RUnlock()
|
||||||
tx, err := b.db.Begin(false)
|
tx, err := b.db.Begin(false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if b.lg != nil {
|
||||||
|
b.lg.Fatal("failed to begin tx", zap.Error(err))
|
||||||
|
} else {
|
||||||
plog.Fatalf("cannot begin tx (%s)", err)
|
plog.Fatalf("cannot begin tx (%s)", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
stopc, donec := make(chan struct{}), make(chan struct{})
|
stopc, donec := make(chan struct{}), make(chan struct{})
|
||||||
dbBytes := tx.Size()
|
dbBytes := tx.Size()
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
bolt "github.com/coreos/bbolt"
|
bolt "github.com/coreos/bbolt"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type BatchTx interface {
|
type BatchTx interface {
|
||||||
@ -47,8 +48,16 @@ type batchTx struct {
|
|||||||
func (t *batchTx) UnsafeCreateBucket(name []byte) {
|
func (t *batchTx) UnsafeCreateBucket(name []byte) {
|
||||||
_, err := t.tx.CreateBucket(name)
|
_, err := t.tx.CreateBucket(name)
|
||||||
if err != nil && err != bolt.ErrBucketExists {
|
if err != nil && err != bolt.ErrBucketExists {
|
||||||
|
if t.backend.lg != nil {
|
||||||
|
t.backend.lg.Fatal(
|
||||||
|
"failed to create a bucket",
|
||||||
|
zap.String("bucket-name", string(name)),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Fatalf("cannot create bucket %s (%v)", name, err)
|
plog.Fatalf("cannot create bucket %s (%v)", name, err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
t.pending++
|
t.pending++
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -65,16 +74,31 @@ func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
|
|||||||
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
|
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
|
||||||
bucket := t.tx.Bucket(bucketName)
|
bucket := t.tx.Bucket(bucketName)
|
||||||
if bucket == nil {
|
if bucket == nil {
|
||||||
|
if t.backend.lg != nil {
|
||||||
|
t.backend.lg.Fatal(
|
||||||
|
"failed to find a bucket",
|
||||||
|
zap.String("bucket-name", string(bucketName)),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Fatalf("bucket %s does not exist", bucketName)
|
plog.Fatalf("bucket %s does not exist", bucketName)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if seq {
|
if seq {
|
||||||
// it is useful to increase fill percent when the workloads are mostly append-only.
|
// it is useful to increase fill percent when the workloads are mostly append-only.
|
||||||
// this can delay the page split and reduce space usage.
|
// this can delay the page split and reduce space usage.
|
||||||
bucket.FillPercent = 0.9
|
bucket.FillPercent = 0.9
|
||||||
}
|
}
|
||||||
if err := bucket.Put(key, value); err != nil {
|
if err := bucket.Put(key, value); err != nil {
|
||||||
|
if t.backend.lg != nil {
|
||||||
|
t.backend.lg.Fatal(
|
||||||
|
"failed to write to a bucket",
|
||||||
|
zap.String("bucket-name", string(bucketName)),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Fatalf("cannot put key into bucket (%v)", err)
|
plog.Fatalf("cannot put key into bucket (%v)", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
t.pending++
|
t.pending++
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,8 +106,15 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
|
|||||||
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
||||||
bucket := t.tx.Bucket(bucketName)
|
bucket := t.tx.Bucket(bucketName)
|
||||||
if bucket == nil {
|
if bucket == nil {
|
||||||
|
if t.backend.lg != nil {
|
||||||
|
t.backend.lg.Fatal(
|
||||||
|
"failed to find a bucket",
|
||||||
|
zap.String("bucket-name", string(bucketName)),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Fatalf("bucket %s does not exist", bucketName)
|
plog.Fatalf("bucket %s does not exist", bucketName)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return unsafeRange(bucket.Cursor(), key, endKey, limit)
|
return unsafeRange(bucket.Cursor(), key, endKey, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,12 +144,27 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte
|
|||||||
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
|
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
|
||||||
bucket := t.tx.Bucket(bucketName)
|
bucket := t.tx.Bucket(bucketName)
|
||||||
if bucket == nil {
|
if bucket == nil {
|
||||||
|
if t.backend.lg != nil {
|
||||||
|
t.backend.lg.Fatal(
|
||||||
|
"failed to find a bucket",
|
||||||
|
zap.String("bucket-name", string(bucketName)),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Fatalf("bucket %s does not exist", bucketName)
|
plog.Fatalf("bucket %s does not exist", bucketName)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
err := bucket.Delete(key)
|
err := bucket.Delete(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if t.backend.lg != nil {
|
||||||
|
t.backend.lg.Fatal(
|
||||||
|
"failed to delete a key",
|
||||||
|
zap.String("bucket-name", string(bucketName)),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Fatalf("cannot delete key from bucket (%v)", err)
|
plog.Fatalf("cannot delete key from bucket (%v)", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
t.pending++
|
t.pending++
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -177,9 +223,16 @@ func (t *batchTx) commit(stop bool) {
|
|||||||
|
|
||||||
t.pending = 0
|
t.pending = 0
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if t.backend.lg != nil {
|
||||||
|
t.backend.lg.Fatal(
|
||||||
|
"failed to commit tx",
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Fatalf("cannot commit tx (%s)", err)
|
plog.Fatalf("cannot commit tx (%s)", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if !stop {
|
if !stop {
|
||||||
t.tx = t.backend.begin(true)
|
t.tx = t.backend.begin(true)
|
||||||
}
|
}
|
||||||
@ -236,8 +289,15 @@ func (t *batchTxBuffered) commit(stop bool) {
|
|||||||
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
||||||
if t.backend.readTx.tx != nil {
|
if t.backend.readTx.tx != nil {
|
||||||
if err := t.backend.readTx.tx.Rollback(); err != nil {
|
if err := t.backend.readTx.tx.Rollback(); err != nil {
|
||||||
|
if t.backend.lg != nil {
|
||||||
|
t.backend.lg.Fatal(
|
||||||
|
"failed to rollback tx",
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Fatalf("cannot rollback tx (%s)", err)
|
plog.Fatalf("cannot rollback tx (%s)", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
t.backend.readTx.reset()
|
t.backend.readTx.reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,12 +57,12 @@ func (ti *treeIndex) Put(key []byte, rev revision) {
|
|||||||
defer ti.Unlock()
|
defer ti.Unlock()
|
||||||
item := ti.tree.Get(keyi)
|
item := ti.tree.Get(keyi)
|
||||||
if item == nil {
|
if item == nil {
|
||||||
keyi.put(rev.main, rev.sub)
|
keyi.put(ti.lg, rev.main, rev.sub)
|
||||||
ti.tree.ReplaceOrInsert(keyi)
|
ti.tree.ReplaceOrInsert(keyi)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
okeyi := item.(*keyIndex)
|
okeyi := item.(*keyIndex)
|
||||||
okeyi.put(rev.main, rev.sub)
|
okeyi.put(ti.lg, rev.main, rev.sub)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
|
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
|
||||||
@ -72,7 +72,7 @@ func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, v
|
|||||||
if keyi = ti.keyIndex(keyi); keyi == nil {
|
if keyi = ti.keyIndex(keyi); keyi == nil {
|
||||||
return revision{}, revision{}, 0, ErrRevisionNotFound
|
return revision{}, revision{}, 0, ErrRevisionNotFound
|
||||||
}
|
}
|
||||||
return keyi.get(atRev)
|
return keyi.get(ti.lg, atRev)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
|
func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
|
||||||
@ -112,7 +112,7 @@ func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
|
|||||||
return []revision{rev}
|
return []revision{rev}
|
||||||
}
|
}
|
||||||
ti.visit(key, end, func(ki *keyIndex) {
|
ti.visit(key, end, func(ki *keyIndex) {
|
||||||
if rev, _, _, err := ki.get(atRev); err == nil {
|
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
|
||||||
revs = append(revs, rev)
|
revs = append(revs, rev)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -128,7 +128,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
|
|||||||
return [][]byte{key}, []revision{rev}
|
return [][]byte{key}, []revision{rev}
|
||||||
}
|
}
|
||||||
ti.visit(key, end, func(ki *keyIndex) {
|
ti.visit(key, end, func(ki *keyIndex) {
|
||||||
if rev, _, _, err := ki.get(atRev); err == nil {
|
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
|
||||||
revs = append(revs, rev)
|
revs = append(revs, rev)
|
||||||
keys = append(keys, ki.key)
|
keys = append(keys, ki.key)
|
||||||
}
|
}
|
||||||
@ -147,7 +147,7 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ki := item.(*keyIndex)
|
ki := item.(*keyIndex)
|
||||||
return ki.tombstone(rev.main, rev.sub)
|
return ki.tombstone(ti.lg, rev.main, rev.sub)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RangeSince returns all revisions from key(including) to end(excluding)
|
// RangeSince returns all revisions from key(including) to end(excluding)
|
||||||
@ -165,7 +165,7 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
keyi = item.(*keyIndex)
|
keyi = item.(*keyIndex)
|
||||||
return keyi.since(rev)
|
return keyi.since(ti.lg, rev)
|
||||||
}
|
}
|
||||||
|
|
||||||
endi := &keyIndex{key: end}
|
endi := &keyIndex{key: end}
|
||||||
@ -175,7 +175,7 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
curKeyi := item.(*keyIndex)
|
curKeyi := item.(*keyIndex)
|
||||||
revs = append(revs, curKeyi.since(rev)...)
|
revs = append(revs, curKeyi.since(ti.lg, rev)...)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
sort.Sort(revisions(revs))
|
sort.Sort(revisions(revs))
|
||||||
@ -199,7 +199,7 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
|
|||||||
//Lock is needed here to prevent modification to the keyIndex while
|
//Lock is needed here to prevent modification to the keyIndex while
|
||||||
//compaction is going on or revision added to empty before deletion
|
//compaction is going on or revision added to empty before deletion
|
||||||
ti.Lock()
|
ti.Lock()
|
||||||
keyi.compact(rev, available)
|
keyi.compact(ti.lg, rev, available)
|
||||||
if keyi.isEmpty() {
|
if keyi.isEmpty() {
|
||||||
item := ti.tree.Delete(keyi)
|
item := ti.tree.Delete(keyi)
|
||||||
if item == nil {
|
if item == nil {
|
||||||
|
@ -284,10 +284,10 @@ func restore(ti *treeIndex, key []byte, created, modified revision, ver int64) {
|
|||||||
defer ti.Unlock()
|
defer ti.Unlock()
|
||||||
item := ti.tree.Get(keyi)
|
item := ti.tree.Get(keyi)
|
||||||
if item == nil {
|
if item == nil {
|
||||||
keyi.restore(created, modified, ver)
|
keyi.restore(ti.lg, created, modified, ver)
|
||||||
ti.tree.ReplaceOrInsert(keyi)
|
ti.tree.ReplaceOrInsert(keyi)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
okeyi := item.(*keyIndex)
|
okeyi := item.(*keyIndex)
|
||||||
okeyi.put(modified.main, modified.sub)
|
okeyi.put(ti.lg, modified.main, modified.sub)
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/google/btree"
|
"github.com/google/btree"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -73,12 +74,22 @@ type keyIndex struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// put puts a revision to the keyIndex.
|
// put puts a revision to the keyIndex.
|
||||||
func (ki *keyIndex) put(main int64, sub int64) {
|
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
|
||||||
rev := revision{main: main, sub: sub}
|
rev := revision{main: main, sub: sub}
|
||||||
|
|
||||||
if !rev.GreaterThan(ki.modified) {
|
if !rev.GreaterThan(ki.modified) {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Panic(
|
||||||
|
"'put' with an unexpected smaller revision",
|
||||||
|
zap.Int64("given-revision-main", rev.main),
|
||||||
|
zap.Int64("given-revision-sub", rev.sub),
|
||||||
|
zap.Int64("modified-revision-main", ki.modified.main),
|
||||||
|
zap.Int64("modified-revision-sub", ki.modified.sub),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
|
plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if len(ki.generations) == 0 {
|
if len(ki.generations) == 0 {
|
||||||
ki.generations = append(ki.generations, generation{})
|
ki.generations = append(ki.generations, generation{})
|
||||||
}
|
}
|
||||||
@ -92,10 +103,17 @@ func (ki *keyIndex) put(main int64, sub int64) {
|
|||||||
ki.modified = rev
|
ki.modified = rev
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ki *keyIndex) restore(created, modified revision, ver int64) {
|
func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {
|
||||||
if len(ki.generations) != 0 {
|
if len(ki.generations) != 0 {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Panic(
|
||||||
|
"'restore' got an unexpected non-empty generations",
|
||||||
|
zap.Int("generations-size", len(ki.generations)),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Panicf("store.keyindex: cannot restore non-empty keyIndex")
|
plog.Panicf("store.keyindex: cannot restore non-empty keyIndex")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ki.modified = modified
|
ki.modified = modified
|
||||||
g := generation{created: created, ver: ver, revs: []revision{modified}}
|
g := generation{created: created, ver: ver, revs: []revision{modified}}
|
||||||
@ -106,14 +124,21 @@ func (ki *keyIndex) restore(created, modified revision, ver int64) {
|
|||||||
// tombstone puts a revision, pointing to a tombstone, to the keyIndex.
|
// tombstone puts a revision, pointing to a tombstone, to the keyIndex.
|
||||||
// It also creates a new empty generation in the keyIndex.
|
// It also creates a new empty generation in the keyIndex.
|
||||||
// It returns ErrRevisionNotFound when tombstone on an empty generation.
|
// It returns ErrRevisionNotFound when tombstone on an empty generation.
|
||||||
func (ki *keyIndex) tombstone(main int64, sub int64) error {
|
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
|
||||||
if ki.isEmpty() {
|
if ki.isEmpty() {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Panic(
|
||||||
|
"'tombstone' got an unexpected empty keyIndex",
|
||||||
|
zap.String("key", string(ki.key)),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
|
plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if ki.generations[len(ki.generations)-1].isEmpty() {
|
if ki.generations[len(ki.generations)-1].isEmpty() {
|
||||||
return ErrRevisionNotFound
|
return ErrRevisionNotFound
|
||||||
}
|
}
|
||||||
ki.put(main, sub)
|
ki.put(lg, main, sub)
|
||||||
ki.generations = append(ki.generations, generation{})
|
ki.generations = append(ki.generations, generation{})
|
||||||
keysGauge.Dec()
|
keysGauge.Dec()
|
||||||
return nil
|
return nil
|
||||||
@ -121,10 +146,17 @@ func (ki *keyIndex) tombstone(main int64, sub int64) error {
|
|||||||
|
|
||||||
// get gets the modified, created revision and version of the key that satisfies the given atRev.
|
// get gets the modified, created revision and version of the key that satisfies the given atRev.
|
||||||
// Rev must be higher than or equal to the given atRev.
|
// Rev must be higher than or equal to the given atRev.
|
||||||
func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err error) {
|
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
|
||||||
if ki.isEmpty() {
|
if ki.isEmpty() {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Panic(
|
||||||
|
"'get' got an unexpected empty keyIndex",
|
||||||
|
zap.String("key", string(ki.key)),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
|
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
g := ki.findGeneration(atRev)
|
g := ki.findGeneration(atRev)
|
||||||
if g.isEmpty() {
|
if g.isEmpty() {
|
||||||
return revision{}, revision{}, 0, ErrRevisionNotFound
|
return revision{}, revision{}, 0, ErrRevisionNotFound
|
||||||
@ -141,10 +173,17 @@ func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err
|
|||||||
// since returns revisions since the given rev. Only the revision with the
|
// since returns revisions since the given rev. Only the revision with the
|
||||||
// largest sub revision will be returned if multiple revisions have the same
|
// largest sub revision will be returned if multiple revisions have the same
|
||||||
// main revision.
|
// main revision.
|
||||||
func (ki *keyIndex) since(rev int64) []revision {
|
func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
|
||||||
if ki.isEmpty() {
|
if ki.isEmpty() {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Panic(
|
||||||
|
"'since' got an unexpected empty keyIndex",
|
||||||
|
zap.String("key", string(ki.key)),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
|
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
since := revision{rev, 0}
|
since := revision{rev, 0}
|
||||||
var gi int
|
var gi int
|
||||||
// find the generations to start checking
|
// find the generations to start checking
|
||||||
@ -182,10 +221,17 @@ func (ki *keyIndex) since(rev int64) []revision {
|
|||||||
// revision than the given atRev except the largest one (If the largest one is
|
// revision than the given atRev except the largest one (If the largest one is
|
||||||
// a tombstone, it will not be kept).
|
// a tombstone, it will not be kept).
|
||||||
// If a generation becomes empty during compaction, it will be removed.
|
// If a generation becomes empty during compaction, it will be removed.
|
||||||
func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
|
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
|
||||||
if ki.isEmpty() {
|
if ki.isEmpty() {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Panic(
|
||||||
|
"'compact' got an unexpected empty keyIndex",
|
||||||
|
zap.String("key", string(ki.key)),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
|
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
genIdx, revIndex := ki.doCompact(atRev, available)
|
genIdx, revIndex := ki.doCompact(atRev, available)
|
||||||
|
|
||||||
|
@ -17,6 +17,8 @@ package mvcc
|
|||||||
import (
|
import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestKeyIndexGet(t *testing.T) {
|
func TestKeyIndexGet(t *testing.T) {
|
||||||
@ -28,7 +30,7 @@ func TestKeyIndexGet(t *testing.T) {
|
|||||||
// {{8, 0}[1], {10, 0}[2], {12, 0}(t)[3]}
|
// {{8, 0}[1], {10, 0}[2], {12, 0}(t)[3]}
|
||||||
// {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]}
|
// {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]}
|
||||||
ki := newTestKeyIndex()
|
ki := newTestKeyIndex()
|
||||||
ki.compact(4, make(map[revision]struct{}))
|
ki.compact(zap.NewExample(), 4, make(map[revision]struct{}))
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
rev int64
|
rev int64
|
||||||
@ -68,7 +70,7 @@ func TestKeyIndexGet(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
mod, creat, ver, err := ki.get(tt.rev)
|
mod, creat, ver, err := ki.get(zap.NewExample(), tt.rev)
|
||||||
if err != tt.werr {
|
if err != tt.werr {
|
||||||
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
|
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
|
||||||
}
|
}
|
||||||
@ -86,7 +88,7 @@ func TestKeyIndexGet(t *testing.T) {
|
|||||||
|
|
||||||
func TestKeyIndexSince(t *testing.T) {
|
func TestKeyIndexSince(t *testing.T) {
|
||||||
ki := newTestKeyIndex()
|
ki := newTestKeyIndex()
|
||||||
ki.compact(4, make(map[revision]struct{}))
|
ki.compact(zap.NewExample(), 4, make(map[revision]struct{}))
|
||||||
|
|
||||||
allRevs := []revision{{4, 0}, {6, 0}, {8, 0}, {10, 0}, {12, 0}, {14, 1}, {16, 0}}
|
allRevs := []revision{{4, 0}, {6, 0}, {8, 0}, {10, 0}, {12, 0}, {14, 1}, {16, 0}}
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
@ -115,7 +117,7 @@ func TestKeyIndexSince(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
revs := ki.since(tt.rev)
|
revs := ki.since(zap.NewExample(), tt.rev)
|
||||||
if !reflect.DeepEqual(revs, tt.wrevs) {
|
if !reflect.DeepEqual(revs, tt.wrevs) {
|
||||||
t.Errorf("#%d: revs = %+v, want %+v", i, revs, tt.wrevs)
|
t.Errorf("#%d: revs = %+v, want %+v", i, revs, tt.wrevs)
|
||||||
}
|
}
|
||||||
@ -124,7 +126,7 @@ func TestKeyIndexSince(t *testing.T) {
|
|||||||
|
|
||||||
func TestKeyIndexPut(t *testing.T) {
|
func TestKeyIndexPut(t *testing.T) {
|
||||||
ki := &keyIndex{key: []byte("foo")}
|
ki := &keyIndex{key: []byte("foo")}
|
||||||
ki.put(5, 0)
|
ki.put(zap.NewExample(), 5, 0)
|
||||||
|
|
||||||
wki := &keyIndex{
|
wki := &keyIndex{
|
||||||
key: []byte("foo"),
|
key: []byte("foo"),
|
||||||
@ -135,7 +137,7 @@ func TestKeyIndexPut(t *testing.T) {
|
|||||||
t.Errorf("ki = %+v, want %+v", ki, wki)
|
t.Errorf("ki = %+v, want %+v", ki, wki)
|
||||||
}
|
}
|
||||||
|
|
||||||
ki.put(7, 0)
|
ki.put(zap.NewExample(), 7, 0)
|
||||||
|
|
||||||
wki = &keyIndex{
|
wki = &keyIndex{
|
||||||
key: []byte("foo"),
|
key: []byte("foo"),
|
||||||
@ -149,7 +151,7 @@ func TestKeyIndexPut(t *testing.T) {
|
|||||||
|
|
||||||
func TestKeyIndexRestore(t *testing.T) {
|
func TestKeyIndexRestore(t *testing.T) {
|
||||||
ki := &keyIndex{key: []byte("foo")}
|
ki := &keyIndex{key: []byte("foo")}
|
||||||
ki.restore(revision{5, 0}, revision{7, 0}, 2)
|
ki.restore(zap.NewExample(), revision{5, 0}, revision{7, 0}, 2)
|
||||||
|
|
||||||
wki := &keyIndex{
|
wki := &keyIndex{
|
||||||
key: []byte("foo"),
|
key: []byte("foo"),
|
||||||
@ -163,9 +165,9 @@ func TestKeyIndexRestore(t *testing.T) {
|
|||||||
|
|
||||||
func TestKeyIndexTombstone(t *testing.T) {
|
func TestKeyIndexTombstone(t *testing.T) {
|
||||||
ki := &keyIndex{key: []byte("foo")}
|
ki := &keyIndex{key: []byte("foo")}
|
||||||
ki.put(5, 0)
|
ki.put(zap.NewExample(), 5, 0)
|
||||||
|
|
||||||
err := ki.tombstone(7, 0)
|
err := ki.tombstone(zap.NewExample(), 7, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected tombstone error: %v", err)
|
t.Errorf("unexpected tombstone error: %v", err)
|
||||||
}
|
}
|
||||||
@ -179,9 +181,9 @@ func TestKeyIndexTombstone(t *testing.T) {
|
|||||||
t.Errorf("ki = %+v, want %+v", ki, wki)
|
t.Errorf("ki = %+v, want %+v", ki, wki)
|
||||||
}
|
}
|
||||||
|
|
||||||
ki.put(8, 0)
|
ki.put(zap.NewExample(), 8, 0)
|
||||||
ki.put(9, 0)
|
ki.put(zap.NewExample(), 9, 0)
|
||||||
err = ki.tombstone(15, 0)
|
err = ki.tombstone(zap.NewExample(), 15, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected tombstone error: %v", err)
|
t.Errorf("unexpected tombstone error: %v", err)
|
||||||
}
|
}
|
||||||
@ -199,7 +201,7 @@ func TestKeyIndexTombstone(t *testing.T) {
|
|||||||
t.Errorf("ki = %+v, want %+v", ki, wki)
|
t.Errorf("ki = %+v, want %+v", ki, wki)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ki.tombstone(16, 0)
|
err = ki.tombstone(zap.NewExample(), 16, 0)
|
||||||
if err != ErrRevisionNotFound {
|
if err != ErrRevisionNotFound {
|
||||||
t.Errorf("tombstone error = %v, want %v", err, ErrRevisionNotFound)
|
t.Errorf("tombstone error = %v, want %v", err, ErrRevisionNotFound)
|
||||||
}
|
}
|
||||||
@ -454,7 +456,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
|
|||||||
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
|
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
|
||||||
}
|
}
|
||||||
am = make(map[revision]struct{})
|
am = make(map[revision]struct{})
|
||||||
ki.compact(tt.compact, am)
|
ki.compact(zap.NewExample(), tt.compact, am)
|
||||||
if !reflect.DeepEqual(ki, tt.wki) {
|
if !reflect.DeepEqual(ki, tt.wki) {
|
||||||
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
|
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
|
||||||
}
|
}
|
||||||
@ -477,7 +479,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
|
|||||||
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
|
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
|
||||||
}
|
}
|
||||||
am = make(map[revision]struct{})
|
am = make(map[revision]struct{})
|
||||||
ki.compact(tt.compact, am)
|
ki.compact(zap.NewExample(), tt.compact, am)
|
||||||
if !reflect.DeepEqual(ki, tt.wki) {
|
if !reflect.DeepEqual(ki, tt.wki) {
|
||||||
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
|
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
|
||||||
}
|
}
|
||||||
@ -500,7 +502,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
|
|||||||
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
|
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
|
||||||
}
|
}
|
||||||
am = make(map[revision]struct{})
|
am = make(map[revision]struct{})
|
||||||
ki.compact(tt.compact, am)
|
ki.compact(zap.NewExample(), tt.compact, am)
|
||||||
if !reflect.DeepEqual(ki, tt.wki) {
|
if !reflect.DeepEqual(ki, tt.wki) {
|
||||||
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
|
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
|
||||||
}
|
}
|
||||||
@ -530,10 +532,10 @@ func cloneGeneration(g *generation) *generation {
|
|||||||
// test that compact on version that higher than last modified version works well
|
// test that compact on version that higher than last modified version works well
|
||||||
func TestKeyIndexCompactOnFurtherRev(t *testing.T) {
|
func TestKeyIndexCompactOnFurtherRev(t *testing.T) {
|
||||||
ki := &keyIndex{key: []byte("foo")}
|
ki := &keyIndex{key: []byte("foo")}
|
||||||
ki.put(1, 0)
|
ki.put(zap.NewExample(), 1, 0)
|
||||||
ki.put(2, 0)
|
ki.put(zap.NewExample(), 2, 0)
|
||||||
am := make(map[revision]struct{})
|
am := make(map[revision]struct{})
|
||||||
ki.compact(3, am)
|
ki.compact(zap.NewExample(), 3, am)
|
||||||
|
|
||||||
wki := &keyIndex{
|
wki := &keyIndex{
|
||||||
key: []byte("foo"),
|
key: []byte("foo"),
|
||||||
@ -685,14 +687,14 @@ func newTestKeyIndex() *keyIndex {
|
|||||||
// {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]}
|
// {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]}
|
||||||
|
|
||||||
ki := &keyIndex{key: []byte("foo")}
|
ki := &keyIndex{key: []byte("foo")}
|
||||||
ki.put(2, 0)
|
ki.put(zap.NewExample(), 2, 0)
|
||||||
ki.put(4, 0)
|
ki.put(zap.NewExample(), 4, 0)
|
||||||
ki.tombstone(6, 0)
|
ki.tombstone(zap.NewExample(), 6, 0)
|
||||||
ki.put(8, 0)
|
ki.put(zap.NewExample(), 8, 0)
|
||||||
ki.put(10, 0)
|
ki.put(zap.NewExample(), 10, 0)
|
||||||
ki.tombstone(12, 0)
|
ki.tombstone(zap.NewExample(), 12, 0)
|
||||||
ki.put(14, 0)
|
ki.put(zap.NewExample(), 14, 0)
|
||||||
ki.put(14, 1)
|
ki.put(zap.NewExample(), 14, 1)
|
||||||
ki.tombstone(16, 0)
|
ki.tombstone(zap.NewExample(), 16, 0)
|
||||||
return ki
|
return ki
|
||||||
}
|
}
|
||||||
|
@ -351,7 +351,7 @@ func (s *store) restore() error {
|
|||||||
|
|
||||||
// index keys concurrently as they're loaded in from tx
|
// index keys concurrently as they're loaded in from tx
|
||||||
keysGauge.Set(0)
|
keysGauge.Set(0)
|
||||||
rkvc, revc := restoreIntoIndex(s.kvindex)
|
rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
|
||||||
for {
|
for {
|
||||||
keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
|
keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
|
||||||
if len(keys) == 0 {
|
if len(keys) == 0 {
|
||||||
@ -359,7 +359,7 @@ func (s *store) restore() error {
|
|||||||
}
|
}
|
||||||
// rkvc blocks if the total pending keys exceeds the restore
|
// rkvc blocks if the total pending keys exceeds the restore
|
||||||
// chunk size to keep keys from consuming too much memory.
|
// chunk size to keep keys from consuming too much memory.
|
||||||
restoreChunk(rkvc, keys, vals, keyToLease)
|
restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
|
||||||
if len(keys) < restoreChunkKeys {
|
if len(keys) < restoreChunkKeys {
|
||||||
// partial set implies final set
|
// partial set implies final set
|
||||||
break
|
break
|
||||||
@ -426,7 +426,7 @@ type revKeyValue struct {
|
|||||||
kstr string
|
kstr string
|
||||||
}
|
}
|
||||||
|
|
||||||
func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) {
|
func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) {
|
||||||
rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
|
rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
|
||||||
go func() {
|
go func() {
|
||||||
currentRev := int64(1)
|
currentRev := int64(1)
|
||||||
@ -457,12 +457,12 @@ func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) {
|
|||||||
currentRev = rev.main
|
currentRev = rev.main
|
||||||
if ok {
|
if ok {
|
||||||
if isTombstone(rkv.key) {
|
if isTombstone(rkv.key) {
|
||||||
ki.tombstone(rev.main, rev.sub)
|
ki.tombstone(lg, rev.main, rev.sub)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ki.put(rev.main, rev.sub)
|
ki.put(lg, rev.main, rev.sub)
|
||||||
} else if !isTombstone(rkv.key) {
|
} else if !isTombstone(rkv.key) {
|
||||||
ki.restore(revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
|
ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
|
||||||
idx.Insert(ki)
|
idx.Insert(ki)
|
||||||
kiCache[rkv.kstr] = ki
|
kiCache[rkv.kstr] = ki
|
||||||
}
|
}
|
||||||
@ -471,12 +471,16 @@ func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) {
|
|||||||
return rkvc, revc
|
return rkvc, revc
|
||||||
}
|
}
|
||||||
|
|
||||||
func restoreChunk(kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
|
func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
|
||||||
for i, key := range keys {
|
for i, key := range keys {
|
||||||
rkv := revKeyValue{key: key}
|
rkv := revKeyValue{key: key}
|
||||||
if err := rkv.kv.Unmarshal(vals[i]); err != nil {
|
if err := rkv.kv.Unmarshal(vals[i]); err != nil {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
|
||||||
|
} else {
|
||||||
plog.Fatalf("cannot unmarshal event: %v", err)
|
plog.Fatalf("cannot unmarshal event: %v", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
rkv.kstr = string(rkv.kv.Key)
|
rkv.kstr = string(rkv.kv.Key)
|
||||||
if isTombstone(key) {
|
if isTombstone(key) {
|
||||||
delete(keyToLease, rkv.kstr)
|
delete(keyToLease, rkv.kstr)
|
||||||
@ -525,10 +529,18 @@ func (s *store) ConsistentIndex() uint64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// appendMarkTombstone appends tombstone mark to normal revision bytes.
|
// appendMarkTombstone appends tombstone mark to normal revision bytes.
|
||||||
func appendMarkTombstone(b []byte) []byte {
|
func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
|
||||||
if len(b) != revBytesLen {
|
if len(b) != revBytesLen {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Panic(
|
||||||
|
"cannot append tombstone mark to non-normal revision bytes",
|
||||||
|
zap.Int("expected-revision-bytes-size", revBytesLen),
|
||||||
|
zap.Int("given-revision-bytes-size", len(b)),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Panicf("cannot append mark to non normal revision bytes")
|
plog.Panicf("cannot append mark to non normal revision bytes")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return append(b, markTombstone)
|
return append(b, markTombstone)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||||
"github.com/coreos/etcd/pkg/schedule"
|
"github.com/coreos/etcd/pkg/schedule"
|
||||||
"github.com/coreos/etcd/pkg/testutil"
|
"github.com/coreos/etcd/pkg/testutil"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -672,7 +673,7 @@ func newTestKeyBytes(rev revision, tombstone bool) []byte {
|
|||||||
bytes := newRevBytes()
|
bytes := newRevBytes()
|
||||||
revToBytes(rev, bytes)
|
revToBytes(rev, bytes)
|
||||||
if tombstone {
|
if tombstone {
|
||||||
bytes = appendMarkTombstone(bytes)
|
bytes = appendMarkTombstone(zap.NewExample(), bytes)
|
||||||
}
|
}
|
||||||
return bytes
|
return bytes
|
||||||
}
|
}
|
||||||
@ -696,6 +697,7 @@ func newFakeStore() *store {
|
|||||||
compactMainRev: -1,
|
compactMainRev: -1,
|
||||||
fifoSched: schedule.NewFIFOScheduler(),
|
fifoSched: schedule.NewFIFOScheduler(),
|
||||||
stopc: make(chan struct{}),
|
stopc: make(chan struct{}),
|
||||||
|
lg: zap.NewExample(),
|
||||||
}
|
}
|
||||||
s.ReadView, s.WriteView = &readView{s}, &writeView{s}
|
s.ReadView, s.WriteView = &readView{s}, &writeView{s}
|
||||||
return s
|
return s
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
"github.com/coreos/etcd/lease"
|
"github.com/coreos/etcd/lease"
|
||||||
"github.com/coreos/etcd/mvcc/backend"
|
"github.com/coreos/etcd/mvcc/backend"
|
||||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type storeTxnRead struct {
|
type storeTxnRead struct {
|
||||||
@ -139,12 +140,27 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
|
|||||||
revToBytes(revpair, revBytes)
|
revToBytes(revpair, revBytes)
|
||||||
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
|
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
|
||||||
if len(vs) != 1 {
|
if len(vs) != 1 {
|
||||||
|
if tr.s.lg != nil {
|
||||||
|
tr.s.lg.Fatal(
|
||||||
|
"range failed to find revision pair",
|
||||||
|
zap.Int64("revision-main", revpair.main),
|
||||||
|
zap.Int64("revision-sub", revpair.sub),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
|
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if err := kvs[i].Unmarshal(vs[0]); err != nil {
|
if err := kvs[i].Unmarshal(vs[0]); err != nil {
|
||||||
|
if tr.s.lg != nil {
|
||||||
|
tr.s.lg.Fatal(
|
||||||
|
"failed to unmarshal mvccpb.KeyValue",
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Fatalf("cannot unmarshal event: %v", err)
|
plog.Fatalf("cannot unmarshal event: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
|
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -177,8 +193,15 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
|
|||||||
|
|
||||||
d, err := kv.Marshal()
|
d, err := kv.Marshal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if tw.storeTxnRead.s.lg != nil {
|
||||||
|
tw.storeTxnRead.s.lg.Fatal(
|
||||||
|
"failed to marshal mvccpb.KeyValue",
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Fatalf("cannot marshal event: %v", err)
|
plog.Fatalf("cannot marshal event: %v", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
|
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
|
||||||
tw.s.kvindex.Put(key, idxRev)
|
tw.s.kvindex.Put(key, idxRev)
|
||||||
@ -190,9 +213,16 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
|
|||||||
}
|
}
|
||||||
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
|
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if tw.storeTxnRead.s.lg != nil {
|
||||||
|
tw.storeTxnRead.s.lg.Fatal(
|
||||||
|
"failed to detach old lease from a key",
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Errorf("unexpected error from lease detach: %v", err)
|
plog.Errorf("unexpected error from lease detach: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if leaseID != lease.NoLease {
|
if leaseID != lease.NoLease {
|
||||||
if tw.s.le == nil {
|
if tw.s.le == nil {
|
||||||
panic("no lessor to attach lease")
|
panic("no lessor to attach lease")
|
||||||
@ -223,20 +253,41 @@ func (tw *storeTxnWrite) delete(key []byte, rev revision) {
|
|||||||
ibytes := newRevBytes()
|
ibytes := newRevBytes()
|
||||||
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
|
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
|
||||||
revToBytes(idxRev, ibytes)
|
revToBytes(idxRev, ibytes)
|
||||||
ibytes = appendMarkTombstone(ibytes)
|
|
||||||
|
if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil {
|
||||||
|
ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
|
||||||
|
} else {
|
||||||
|
// TODO: remove this in v3.5
|
||||||
|
ibytes = appendMarkTombstone(nil, ibytes)
|
||||||
|
}
|
||||||
|
|
||||||
kv := mvccpb.KeyValue{Key: key}
|
kv := mvccpb.KeyValue{Key: key}
|
||||||
|
|
||||||
d, err := kv.Marshal()
|
d, err := kv.Marshal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if tw.storeTxnRead.s.lg != nil {
|
||||||
|
tw.storeTxnRead.s.lg.Fatal(
|
||||||
|
"failed to marshal mvccpb.KeyValue",
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Fatalf("cannot marshal event: %v", err)
|
plog.Fatalf("cannot marshal event: %v", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
|
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
|
||||||
err = tw.s.kvindex.Tombstone(key, idxRev)
|
err = tw.s.kvindex.Tombstone(key, idxRev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if tw.storeTxnRead.s.lg != nil {
|
||||||
|
tw.storeTxnRead.s.lg.Fatal(
|
||||||
|
"failed to tombstone an existing key",
|
||||||
|
zap.String("key", string(key)),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
|
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
tw.changes = append(tw.changes, kv)
|
tw.changes = append(tw.changes, kv)
|
||||||
|
|
||||||
item := lease.LeaseItem{Key: string(key)}
|
item := lease.LeaseItem{Key: string(key)}
|
||||||
@ -245,9 +296,16 @@ func (tw *storeTxnWrite) delete(key []byte, rev revision) {
|
|||||||
if leaseID != lease.NoLease {
|
if leaseID != lease.NoLease {
|
||||||
err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
|
err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if tw.storeTxnRead.s.lg != nil {
|
||||||
|
tw.storeTxnRead.s.lg.Fatal(
|
||||||
|
"failed to detach old lease from a key",
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Errorf("cannot detach %v", err)
|
plog.Errorf("cannot detach %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }
|
func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }
|
||||||
|
@ -16,6 +16,7 @@ package mvcc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/coreos/etcd/mvcc/backend"
|
"github.com/coreos/etcd/mvcc/backend"
|
||||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||||
@ -47,7 +48,7 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
|
|||||||
|
|
||||||
d, err := kv.Marshal()
|
d, err := kv.Marshal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Fatalf("cannot marshal event: %v", err)
|
panic(fmt.Errorf("cannot marshal event: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
be.BatchTx().Lock()
|
be.BatchTx().Lock()
|
||||||
|
@ -347,7 +347,13 @@ func (s *watchableStore) syncWatchers() int {
|
|||||||
tx := s.store.b.ReadTx()
|
tx := s.store.b.ReadTx()
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
|
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
|
||||||
evs := kvsToEvents(wg, revs, vs)
|
var evs []mvccpb.Event
|
||||||
|
if s.store != nil && s.store.lg != nil {
|
||||||
|
evs = kvsToEvents(s.store.lg, wg, revs, vs)
|
||||||
|
} else {
|
||||||
|
// TODO: remove this in v3.5
|
||||||
|
evs = kvsToEvents(nil, wg, revs, vs)
|
||||||
|
}
|
||||||
tx.Unlock()
|
tx.Unlock()
|
||||||
|
|
||||||
var victims watcherBatch
|
var victims watcherBatch
|
||||||
@ -399,12 +405,16 @@ func (s *watchableStore) syncWatchers() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// kvsToEvents gets all events for the watchers from all key-value pairs
|
// kvsToEvents gets all events for the watchers from all key-value pairs
|
||||||
func kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
|
func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
|
||||||
for i, v := range vals {
|
for i, v := range vals {
|
||||||
var kv mvccpb.KeyValue
|
var kv mvccpb.KeyValue
|
||||||
if err := kv.Unmarshal(v); err != nil {
|
if err := kv.Unmarshal(v); err != nil {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
|
||||||
|
} else {
|
||||||
plog.Panicf("cannot unmarshal event: %v", err)
|
plog.Panicf("cannot unmarshal event: %v", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if !wg.contains(string(kv.Key)) {
|
if !wg.contains(string(kv.Key)) {
|
||||||
continue
|
continue
|
||||||
@ -427,8 +437,15 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
|
|||||||
var victim watcherBatch
|
var victim watcherBatch
|
||||||
for w, eb := range newWatcherBatch(&s.synced, evs) {
|
for w, eb := range newWatcherBatch(&s.synced, evs) {
|
||||||
if eb.revs != 1 {
|
if eb.revs != 1 {
|
||||||
|
if s.store != nil && s.store.lg != nil {
|
||||||
|
s.store.lg.Panic(
|
||||||
|
"unexpected multiple revisions in watch notification",
|
||||||
|
zap.Int("number-of-revisions", eb.revs),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Panicf("unexpected multiple revisions in notification")
|
plog.Panicf("unexpected multiple revisions in notification")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
|
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
|
||||||
pendingEventsGauge.Add(float64(len(eb.evs)))
|
pendingEventsGauge.Add(float64(len(eb.evs)))
|
||||||
} else {
|
} else {
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/coreos/etcd/raftsnap"
|
"github.com/coreos/etcd/raftsnap"
|
||||||
"github.com/coreos/etcd/version"
|
"github.com/coreos/etcd/version"
|
||||||
|
|
||||||
|
humanize "github.com/dustin/go-humanize"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -229,7 +230,8 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
|
msgSize := m.Size()
|
||||||
|
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(msgSize))
|
||||||
|
|
||||||
if m.Type != raftpb.MsgSnap {
|
if m.Type != raftpb.MsgSnap {
|
||||||
if h.lg != nil {
|
if h.lg != nil {
|
||||||
@ -251,7 +253,9 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
"receiving database snapshot",
|
"receiving database snapshot",
|
||||||
zap.String("local-member-id", h.localID.String()),
|
zap.String("local-member-id", h.localID.String()),
|
||||||
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
|
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
|
||||||
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
|
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
|
||||||
|
zap.Int("incoming-snapshot-message-size-bytes", msgSize),
|
||||||
|
zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From))
|
plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From))
|
||||||
@ -263,9 +267,10 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
|
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
|
||||||
if h.lg != nil {
|
if h.lg != nil {
|
||||||
h.lg.Warn(
|
h.lg.Warn(
|
||||||
"failed to save KV snapshot",
|
"failed to save incoming database snapshot",
|
||||||
zap.String("local-member-id", h.localID.String()),
|
zap.String("local-member-id", h.localID.String()),
|
||||||
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
|
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
|
||||||
|
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
@ -282,7 +287,9 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
"received and saved database snapshot",
|
"received and saved database snapshot",
|
||||||
zap.String("local-member-id", h.localID.String()),
|
zap.String("local-member-id", h.localID.String()),
|
||||||
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
|
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
|
||||||
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
|
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
|
||||||
|
zap.Int64("incoming-snapshot-size-bytes", n),
|
||||||
|
zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
|
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/raftsnap"
|
"github.com/coreos/etcd/raftsnap"
|
||||||
|
|
||||||
|
"github.com/dustin/go-humanize"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -79,6 +80,8 @@ func (s *snapshotSender) send(merged raftsnap.Message) {
|
|||||||
"sending database snapshot",
|
"sending database snapshot",
|
||||||
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
|
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
|
||||||
zap.String("remote-peer-id", types.ID(m.To).String()),
|
zap.String("remote-peer-id", types.ID(m.To).String()),
|
||||||
|
zap.Int64("bytes", merged.TotalSize),
|
||||||
|
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
plog.Infof("start to send database snapshot [index: %d, to %s]...", m.Snapshot.Metadata.Index, types.ID(m.To))
|
plog.Infof("start to send database snapshot [index: %d, to %s]...", m.Snapshot.Metadata.Index, types.ID(m.To))
|
||||||
@ -92,6 +95,8 @@ func (s *snapshotSender) send(merged raftsnap.Message) {
|
|||||||
"failed to send database snapshot",
|
"failed to send database snapshot",
|
||||||
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
|
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
|
||||||
zap.String("remote-peer-id", types.ID(m.To).String()),
|
zap.String("remote-peer-id", types.ID(m.To).String()),
|
||||||
|
zap.Int64("bytes", merged.TotalSize),
|
||||||
|
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
@ -122,6 +127,8 @@ func (s *snapshotSender) send(merged raftsnap.Message) {
|
|||||||
"sent database snapshot",
|
"sent database snapshot",
|
||||||
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
|
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
|
||||||
zap.String("remote-peer-id", types.ID(m.To).String()),
|
zap.String("remote-peer-id", types.ID(m.To).String()),
|
||||||
|
zap.Int64("bytes", merged.TotalSize),
|
||||||
|
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
|
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
|
|
||||||
humanize "github.com/dustin/go-humanize"
|
humanize "github.com/dustin/go-humanize"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user