mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: simplify consistent index handling
This commit is contained in:
parent
00f222ecad
commit
eddc741b5e
@ -371,6 +371,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
|
srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
|
||||||
srv.lessor = lease.NewLessor(srv.be)
|
srv.lessor = lease.NewLessor(srv.be)
|
||||||
srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
|
srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
|
||||||
|
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
|
||||||
srv.authStore = auth.NewAuthStore(srv.be)
|
srv.authStore = auth.NewAuthStore(srv.be)
|
||||||
if h := cfg.AutoCompactionRetention; h != 0 {
|
if h := cfg.AutoCompactionRetention; h != 0 {
|
||||||
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
|
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
|
||||||
@ -601,6 +602,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
|||||||
if err := s.kv.Restore(newbe); err != nil {
|
if err := s.kv.Restore(newbe); err != nil {
|
||||||
plog.Panicf("restore KV error: %v", err)
|
plog.Panicf("restore KV error: %v", err)
|
||||||
}
|
}
|
||||||
|
s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex())
|
||||||
|
|
||||||
// Closing old backend might block until all the txns
|
// Closing old backend might block until all the txns
|
||||||
// on the backend are finished.
|
// on the backend are finished.
|
||||||
@ -997,8 +999,6 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
|
|||||||
var shouldstop bool
|
var shouldstop bool
|
||||||
for i := range es {
|
for i := range es {
|
||||||
e := es[i]
|
e := es[i]
|
||||||
// set the consistent index of current executing entry
|
|
||||||
s.consistIndex.setConsistentIndex(e.Index)
|
|
||||||
switch e.Type {
|
switch e.Type {
|
||||||
case raftpb.EntryNormal:
|
case raftpb.EntryNormal:
|
||||||
// raft state machine may generate noop entry when leader confirmation.
|
// raft state machine may generate noop entry when leader confirmation.
|
||||||
@ -1020,6 +1020,12 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
|
|||||||
req := raftReq.V2
|
req := raftReq.V2
|
||||||
s.w.Trigger(req.ID, s.applyRequest(*req))
|
s.w.Trigger(req.ID, s.applyRequest(*req))
|
||||||
} else {
|
} else {
|
||||||
|
// do not re-apply applied entries.
|
||||||
|
if e.Index <= s.consistIndex.ConsistentIndex() {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// set the consistent index of current executing entry
|
||||||
|
s.consistIndex.setConsistentIndex(e.Index)
|
||||||
ar := s.applyV3Request(&raftReq)
|
ar := s.applyV3Request(&raftReq)
|
||||||
if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
|
if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
|
||||||
s.w.Trigger(raftReq.ID, ar)
|
s.w.Trigger(raftReq.ID, ar)
|
||||||
|
@ -1,141 +0,0 @@
|
|||||||
// Copyright 2015 CoreOS, Inc.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/binary"
|
|
||||||
"log"
|
|
||||||
|
|
||||||
"github.com/coreos/etcd/lease"
|
|
||||||
"github.com/coreos/etcd/storage/backend"
|
|
||||||
"github.com/coreos/etcd/storage/storagepb"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
consistentIndexKeyName = []byte("consistent_index")
|
|
||||||
)
|
|
||||||
|
|
||||||
// ConsistentIndexGetter is an interface that wraps the Get method.
|
|
||||||
// Consistent index is the offset of an entry in a consistent replicated log.
|
|
||||||
type ConsistentIndexGetter interface {
|
|
||||||
// ConsistentIndex returns the consistent index of current executing entry.
|
|
||||||
ConsistentIndex() uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
type consistentWatchableStore struct {
|
|
||||||
*watchableStore
|
|
||||||
// The field is used to get the consistent index of current
|
|
||||||
// executing entry.
|
|
||||||
// When the store finishes executing current entry, it will
|
|
||||||
// put the index got from ConsistentIndexGetter into the
|
|
||||||
// underlying backend. This helps to recover consistent index
|
|
||||||
// when restoring.
|
|
||||||
ig ConsistentIndexGetter
|
|
||||||
|
|
||||||
skip bool // indicate whether or not to skip an operation
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
|
|
||||||
return newConsistentWatchableStore(b, le, ig)
|
|
||||||
}
|
|
||||||
|
|
||||||
// newConsistentWatchableStore creates a new consistentWatchableStore with the give
|
|
||||||
// backend.
|
|
||||||
func newConsistentWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *consistentWatchableStore {
|
|
||||||
return &consistentWatchableStore{
|
|
||||||
watchableStore: newWatchableStore(b, le),
|
|
||||||
ig: ig,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consistentWatchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
|
|
||||||
id := s.TxnBegin()
|
|
||||||
rev, err := s.TxnPut(id, key, value, lease)
|
|
||||||
if err != nil {
|
|
||||||
log.Panicf("unexpected TxnPut error (%v)", err)
|
|
||||||
}
|
|
||||||
if err := s.TxnEnd(id); err != nil {
|
|
||||||
log.Panicf("unexpected TxnEnd error (%v)", err)
|
|
||||||
}
|
|
||||||
return rev
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consistentWatchableStore) DeleteRange(key, end []byte) (n, rev int64) {
|
|
||||||
id := s.TxnBegin()
|
|
||||||
n, rev, err := s.TxnDeleteRange(id, key, end)
|
|
||||||
if err != nil {
|
|
||||||
log.Panicf("unexpected TxnDeleteRange error (%v)", err)
|
|
||||||
}
|
|
||||||
if err := s.TxnEnd(id); err != nil {
|
|
||||||
log.Panicf("unexpected TxnEnd error (%v)", err)
|
|
||||||
}
|
|
||||||
return n, rev
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consistentWatchableStore) TxnBegin() int64 {
|
|
||||||
id := s.watchableStore.TxnBegin()
|
|
||||||
|
|
||||||
// If the consistent index of executing entry is not larger than store
|
|
||||||
// consistent index, skip all operations in this txn.
|
|
||||||
s.skip = s.ig.ConsistentIndex() <= s.consistentIndex()
|
|
||||||
|
|
||||||
if !s.skip {
|
|
||||||
// TODO: avoid this unnecessary allocation
|
|
||||||
bs := make([]byte, 8)
|
|
||||||
binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
|
|
||||||
// put the index into the underlying backend
|
|
||||||
// tx has been locked in TxnBegin, so there is no need to lock it again
|
|
||||||
s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
|
|
||||||
}
|
|
||||||
|
|
||||||
return id
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consistentWatchableStore) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
|
|
||||||
if s.skip {
|
|
||||||
return nil, 0, nil
|
|
||||||
}
|
|
||||||
return s.watchableStore.TxnRange(txnID, key, end, limit, rangeRev)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
|
|
||||||
if s.skip {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
return s.watchableStore.TxnPut(txnID, key, value, lease)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consistentWatchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
|
|
||||||
if s.skip {
|
|
||||||
return 0, 0, nil
|
|
||||||
}
|
|
||||||
return s.watchableStore.TxnDeleteRange(txnID, key, end)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consistentWatchableStore) TxnEnd(txnID int64) error {
|
|
||||||
// reset skip var
|
|
||||||
s.skip = false
|
|
||||||
return s.watchableStore.TxnEnd(txnID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consistentWatchableStore) consistentIndex() uint64 {
|
|
||||||
// get the index
|
|
||||||
// tx has been locked in TxnBegin, so there is no need to lock it again
|
|
||||||
_, vs := s.watchableStore.store.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
|
|
||||||
if len(vs) == 0 {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
return binary.BigEndian.Uint64(vs[0])
|
|
||||||
}
|
|
@ -1,61 +0,0 @@
|
|||||||
// Copyright 2015 CoreOS, Inc.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/coreos/etcd/lease"
|
|
||||||
"github.com/coreos/etcd/storage/backend"
|
|
||||||
)
|
|
||||||
|
|
||||||
type indexVal uint64
|
|
||||||
|
|
||||||
func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) }
|
|
||||||
|
|
||||||
func TestConsistentWatchableStoreConsistentIndex(t *testing.T) {
|
|
||||||
var idx indexVal
|
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
|
||||||
s := newConsistentWatchableStore(b, &lease.FakeLessor{}, &idx)
|
|
||||||
defer cleanup(s, b, tmpPath)
|
|
||||||
|
|
||||||
tests := []uint64{1, 2, 3, 5, 10}
|
|
||||||
for i, tt := range tests {
|
|
||||||
idx = indexVal(tt)
|
|
||||||
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
|
||||||
|
|
||||||
id := s.TxnBegin()
|
|
||||||
g := s.consistentIndex()
|
|
||||||
s.TxnEnd(id)
|
|
||||||
if g != tt {
|
|
||||||
t.Errorf("#%d: index = %d, want %d", i, g, tt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConsistentWatchableStoreSkip(t *testing.T) {
|
|
||||||
idx := indexVal(5)
|
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
|
||||||
s := newConsistentWatchableStore(b, &lease.FakeLessor{}, &idx)
|
|
||||||
defer cleanup(s, b, tmpPath)
|
|
||||||
|
|
||||||
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
|
||||||
|
|
||||||
// put is skipped
|
|
||||||
rev := s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
|
||||||
if rev != 0 {
|
|
||||||
t.Errorf("rev = %d, want 0", rev)
|
|
||||||
}
|
|
||||||
}
|
|
@ -101,4 +101,6 @@ type Watchable interface {
|
|||||||
// this entry are skipped and return empty response.
|
// this entry are skipped and return empty response.
|
||||||
type ConsistentWatchableKV interface {
|
type ConsistentWatchableKV interface {
|
||||||
WatchableKV
|
WatchableKV
|
||||||
|
// ConsistentIndex returns the current consistent index of the KV.
|
||||||
|
ConsistentIndex() uint64
|
||||||
}
|
}
|
||||||
|
@ -80,7 +80,7 @@ func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }
|
|||||||
|
|
||||||
func testKVRange(t *testing.T, f rangeFunc) {
|
func testKVRange(t *testing.T, f rangeFunc) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
kvs := put3TestKVs(s)
|
kvs := put3TestKVs(s)
|
||||||
@ -146,7 +146,7 @@ func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
|
|||||||
|
|
||||||
func testKVRangeRev(t *testing.T, f rangeFunc) {
|
func testKVRangeRev(t *testing.T, f rangeFunc) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
kvs := put3TestKVs(s)
|
kvs := put3TestKVs(s)
|
||||||
@ -182,7 +182,7 @@ func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc)
|
|||||||
|
|
||||||
func testKVRangeBadRev(t *testing.T, f rangeFunc) {
|
func testKVRangeBadRev(t *testing.T, f rangeFunc) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
put3TestKVs(s)
|
put3TestKVs(s)
|
||||||
@ -213,7 +213,7 @@ func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }
|
|||||||
|
|
||||||
func testKVRangeLimit(t *testing.T, f rangeFunc) {
|
func testKVRangeLimit(t *testing.T, f rangeFunc) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
kvs := put3TestKVs(s)
|
kvs := put3TestKVs(s)
|
||||||
@ -251,7 +251,7 @@ func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutF
|
|||||||
|
|
||||||
func testKVPutMultipleTimes(t *testing.T, f putFunc) {
|
func testKVPutMultipleTimes(t *testing.T, f putFunc) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@ -313,7 +313,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
|
|||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
|
|
||||||
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
||||||
s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
|
s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
|
||||||
@ -333,7 +333,7 @@ func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, t
|
|||||||
|
|
||||||
func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
|
func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
||||||
@ -354,7 +354,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
|
|||||||
// test that range, put, delete on single key in sequence repeatedly works correctly.
|
// test that range, put, delete on single key in sequence repeatedly works correctly.
|
||||||
func TestKVOperationInSequence(t *testing.T) {
|
func TestKVOperationInSequence(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@ -401,7 +401,7 @@ func TestKVOperationInSequence(t *testing.T) {
|
|||||||
|
|
||||||
func TestKVTxnBlockNonTxnOperations(t *testing.T) {
|
func TestKVTxnBlockNonTxnOperations(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
|
|
||||||
tests := []func(){
|
tests := []func(){
|
||||||
func() { s.Range([]byte("foo"), nil, 0, 0) },
|
func() { s.Range([]byte("foo"), nil, 0, 0) },
|
||||||
@ -435,7 +435,7 @@ func TestKVTxnBlockNonTxnOperations(t *testing.T) {
|
|||||||
|
|
||||||
func TestKVTxnWrongID(t *testing.T) {
|
func TestKVTxnWrongID(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
id := s.TxnBegin()
|
id := s.TxnBegin()
|
||||||
@ -472,7 +472,7 @@ func TestKVTxnWrongID(t *testing.T) {
|
|||||||
// test that txn range, put, delete on single key in sequence repeatedly works correctly.
|
// test that txn range, put, delete on single key in sequence repeatedly works correctly.
|
||||||
func TestKVTxnOperationInSequence(t *testing.T) {
|
func TestKVTxnOperationInSequence(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@ -528,7 +528,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {
|
|||||||
|
|
||||||
func TestKVCompactReserveLastValue(t *testing.T) {
|
func TestKVCompactReserveLastValue(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
s.Put([]byte("foo"), []byte("bar0"), 1)
|
s.Put([]byte("foo"), []byte("bar0"), 1)
|
||||||
@ -582,7 +582,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
|
|||||||
|
|
||||||
func TestKVCompactBad(t *testing.T) {
|
func TestKVCompactBad(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
s.Put([]byte("foo"), []byte("bar0"), lease.NoLease)
|
s.Put([]byte("foo"), []byte("bar0"), lease.NoLease)
|
||||||
@ -615,7 +615,7 @@ func TestKVHash(t *testing.T) {
|
|||||||
for i := 0; i < len(hashes); i++ {
|
for i := 0; i < len(hashes); i++ {
|
||||||
var err error
|
var err error
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
kv := NewStore(b, &lease.FakeLessor{})
|
kv := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
|
kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
|
||||||
kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
|
kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
|
||||||
hashes[i], err = kv.Hash()
|
hashes[i], err = kv.Hash()
|
||||||
@ -652,7 +652,7 @@ func TestKVRestore(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
tt(s)
|
tt(s)
|
||||||
var kvss [][]storagepb.KeyValue
|
var kvss [][]storagepb.KeyValue
|
||||||
for k := int64(0); k < 10; k++ {
|
for k := int64(0); k < 10; k++ {
|
||||||
@ -662,7 +662,7 @@ func TestKVRestore(t *testing.T) {
|
|||||||
s.Close()
|
s.Close()
|
||||||
|
|
||||||
// ns should recover the the previous state from backend.
|
// ns should recover the the previous state from backend.
|
||||||
ns := NewStore(b, &lease.FakeLessor{})
|
ns := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
// wait for possible compaction to finish
|
// wait for possible compaction to finish
|
||||||
testutil.WaitSchedule()
|
testutil.WaitSchedule()
|
||||||
var nkvss [][]storagepb.KeyValue
|
var nkvss [][]storagepb.KeyValue
|
||||||
@ -680,7 +680,7 @@ func TestKVRestore(t *testing.T) {
|
|||||||
|
|
||||||
func TestKVSnapshot(t *testing.T) {
|
func TestKVSnapshot(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
wkvs := put3TestKVs(s)
|
wkvs := put3TestKVs(s)
|
||||||
@ -700,7 +700,7 @@ func TestKVSnapshot(t *testing.T) {
|
|||||||
}
|
}
|
||||||
f.Close()
|
f.Close()
|
||||||
|
|
||||||
ns := NewStore(b, &lease.FakeLessor{})
|
ns := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer ns.Close()
|
defer ns.Close()
|
||||||
kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0)
|
kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -716,7 +716,7 @@ func TestKVSnapshot(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatchableKVWatch(t *testing.T) {
|
func TestWatchableKVWatch(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
|
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
"math"
|
||||||
@ -40,6 +41,7 @@ var (
|
|||||||
markBytePosition = markedRevBytesLen - 1
|
markBytePosition = markedRevBytesLen - 1
|
||||||
markTombstone byte = 't'
|
markTombstone byte = 't'
|
||||||
|
|
||||||
|
consistentIndexKeyName = []byte("consistent_index")
|
||||||
scheduledCompactKeyName = []byte("scheduledCompactRev")
|
scheduledCompactKeyName = []byte("scheduledCompactRev")
|
||||||
finishedCompactKeyName = []byte("finishedCompactRev")
|
finishedCompactKeyName = []byte("finishedCompactRev")
|
||||||
|
|
||||||
@ -49,9 +51,18 @@ var (
|
|||||||
ErrCanceled = errors.New("storage: watcher is canceled")
|
ErrCanceled = errors.New("storage: watcher is canceled")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ConsistentIndexGetter is an interface that wraps the Get method.
|
||||||
|
// Consistent index is the offset of an entry in a consistent replicated log.
|
||||||
|
type ConsistentIndexGetter interface {
|
||||||
|
// ConsistentIndex returns the consistent index of current executing entry.
|
||||||
|
ConsistentIndex() uint64
|
||||||
|
}
|
||||||
|
|
||||||
type store struct {
|
type store struct {
|
||||||
mu sync.Mutex // guards the following
|
mu sync.Mutex // guards the following
|
||||||
|
|
||||||
|
ig ConsistentIndexGetter
|
||||||
|
|
||||||
b backend.Backend
|
b backend.Backend
|
||||||
kvindex index
|
kvindex index
|
||||||
|
|
||||||
@ -72,9 +83,10 @@ type store struct {
|
|||||||
|
|
||||||
// NewStore returns a new store. It is useful to create a store inside
|
// NewStore returns a new store. It is useful to create a store inside
|
||||||
// storage pkg. It should only be used for testing externally.
|
// storage pkg. It should only be used for testing externally.
|
||||||
func NewStore(b backend.Backend, le lease.Lessor) *store {
|
func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
|
||||||
s := &store{
|
s := &store{
|
||||||
b: b,
|
b: b,
|
||||||
|
ig: ig,
|
||||||
kvindex: newTreeIndex(),
|
kvindex: newTreeIndex(),
|
||||||
|
|
||||||
le: le,
|
le: le,
|
||||||
@ -155,6 +167,7 @@ func (s *store) TxnBegin() int64 {
|
|||||||
s.currentRev.sub = 0
|
s.currentRev.sub = 0
|
||||||
s.tx = s.b.BatchTx()
|
s.tx = s.b.BatchTx()
|
||||||
s.tx.Lock()
|
s.tx.Lock()
|
||||||
|
s.saveIndex()
|
||||||
|
|
||||||
s.txnID = rand.Int63()
|
s.txnID = rand.Int63()
|
||||||
return s.txnID
|
return s.txnID
|
||||||
@ -545,6 +558,31 @@ func (s *store) getChanges() []storagepb.KeyValue {
|
|||||||
return changes
|
return changes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *store) saveIndex() {
|
||||||
|
if s.ig == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
tx := s.tx
|
||||||
|
// TODO: avoid this unnecessary allocation
|
||||||
|
bs := make([]byte, 8)
|
||||||
|
binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
|
||||||
|
// put the index into the underlying backend
|
||||||
|
// tx has been locked in TxnBegin, so there is no need to lock it again
|
||||||
|
tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *store) ConsistentIndex() uint64 {
|
||||||
|
// TODO: cache index in a uint64 field?
|
||||||
|
tx := s.b.BatchTx()
|
||||||
|
tx.Lock()
|
||||||
|
defer tx.Unlock()
|
||||||
|
_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
|
||||||
|
if len(vs) == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return binary.BigEndian.Uint64(vs[0])
|
||||||
|
}
|
||||||
|
|
||||||
// appendMarkTombstone appends tombstone mark to normal revision bytes.
|
// appendMarkTombstone appends tombstone mark to normal revision bytes.
|
||||||
func appendMarkTombstone(b []byte) []byte {
|
func appendMarkTombstone(b []byte) []byte {
|
||||||
if len(b) != revBytesLen {
|
if len(b) != revBytesLen {
|
||||||
|
@ -24,7 +24,7 @@ import (
|
|||||||
|
|
||||||
func BenchmarkStorePut(b *testing.B) {
|
func BenchmarkStorePut(b *testing.B) {
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(be, &lease.FakeLessor{})
|
s := NewStore(be, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, be, tmpPath)
|
defer cleanup(s, be, tmpPath)
|
||||||
|
|
||||||
// arbitrary number of bytes
|
// arbitrary number of bytes
|
||||||
@ -43,7 +43,7 @@ func BenchmarkStorePut(b *testing.B) {
|
|||||||
// some synchronization operations, such as mutex locking.
|
// some synchronization operations, such as mutex locking.
|
||||||
func BenchmarkStoreTxnPut(b *testing.B) {
|
func BenchmarkStoreTxnPut(b *testing.B) {
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(be, &lease.FakeLessor{})
|
s := NewStore(be, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, be, tmpPath)
|
defer cleanup(s, be, tmpPath)
|
||||||
|
|
||||||
// arbitrary number of bytes
|
// arbitrary number of bytes
|
||||||
|
@ -62,7 +62,7 @@ func TestScheduleCompaction(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
tx := s.b.BatchTx()
|
tx := s.b.BatchTx()
|
||||||
|
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
|
@ -32,7 +32,7 @@ import (
|
|||||||
|
|
||||||
func TestStoreRev(t *testing.T) {
|
func TestStoreRev(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
|
|
||||||
@ -418,7 +418,7 @@ func TestStoreRestore(t *testing.T) {
|
|||||||
|
|
||||||
func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
|
func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s0 := NewStore(b, &lease.FakeLessor{})
|
s0 := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
|
|
||||||
s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
||||||
@ -435,7 +435,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
|
|||||||
|
|
||||||
s0.Close()
|
s0.Close()
|
||||||
|
|
||||||
s1 := NewStore(b, &lease.FakeLessor{})
|
s1 := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
|
|
||||||
// wait for scheduled compaction to be finished
|
// wait for scheduled compaction to be finished
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
@ -473,7 +473,7 @@ func TestTxnPut(t *testing.T) {
|
|||||||
vals := createBytesSlice(bytesN, sliceN)
|
vals := createBytesSlice(bytesN, sliceN)
|
||||||
|
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
for i := 0; i < sliceN; i++ {
|
for i := 0; i < sliceN; i++ {
|
||||||
@ -494,7 +494,7 @@ func TestTxnPut(t *testing.T) {
|
|||||||
|
|
||||||
func TestTxnBlockBackendForceCommit(t *testing.T) {
|
func TestTxnBlockBackendForceCommit(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(b, &lease.FakeLessor{})
|
s := NewStore(b, &lease.FakeLessor{}, nil)
|
||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
|
|
||||||
id := s.TxnBegin()
|
id := s.TxnBegin()
|
||||||
|
@ -58,9 +58,13 @@ type watchableStore struct {
|
|||||||
// cancel operations.
|
// cancel operations.
|
||||||
type cancelFunc func()
|
type cancelFunc func()
|
||||||
|
|
||||||
func newWatchableStore(b backend.Backend, le lease.Lessor) *watchableStore {
|
func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
|
||||||
|
return newWatchableStore(b, le, ig)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore {
|
||||||
s := &watchableStore{
|
s := &watchableStore{
|
||||||
store: NewStore(b, le),
|
store: NewStore(b, le, ig),
|
||||||
unsynced: newWatcherGroup(),
|
unsynced: newWatcherGroup(),
|
||||||
synced: newWatcherGroup(),
|
synced: newWatcherGroup(),
|
||||||
stopc: make(chan struct{}),
|
stopc: make(chan struct{}),
|
||||||
|
@ -32,7 +32,7 @@ import (
|
|||||||
// we should put to simulate the real-world use cases.
|
// we should put to simulate the real-world use cases.
|
||||||
func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(be, &lease.FakeLessor{})
|
s := NewStore(be, &lease.FakeLessor{}, nil)
|
||||||
|
|
||||||
// manually create watchableStore instead of newWatchableStore
|
// manually create watchableStore instead of newWatchableStore
|
||||||
// because newWatchableStore periodically calls syncWatchersLoop
|
// because newWatchableStore periodically calls syncWatchersLoop
|
||||||
@ -89,7 +89,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
|||||||
|
|
||||||
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
|
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(be, &lease.FakeLessor{})
|
s := newWatchableStore(be, &lease.FakeLessor{}, nil)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
|
@ -28,7 +28,7 @@ import (
|
|||||||
|
|
||||||
func TestWatch(t *testing.T) {
|
func TestWatch(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(b, &lease.FakeLessor{})
|
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -50,7 +50,7 @@ func TestWatch(t *testing.T) {
|
|||||||
|
|
||||||
func TestNewWatcherCancel(t *testing.T) {
|
func TestNewWatcherCancel(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(b, &lease.FakeLessor{})
|
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -82,7 +82,7 @@ func TestCancelUnsynced(t *testing.T) {
|
|||||||
// method to sync watchers in unsynced map. We want to keep watchers
|
// method to sync watchers in unsynced map. We want to keep watchers
|
||||||
// in unsynced to test if syncWatchers works as expected.
|
// in unsynced to test if syncWatchers works as expected.
|
||||||
s := &watchableStore{
|
s := &watchableStore{
|
||||||
store: NewStore(b, &lease.FakeLessor{}),
|
store: NewStore(b, &lease.FakeLessor{}, nil),
|
||||||
unsynced: newWatcherGroup(),
|
unsynced: newWatcherGroup(),
|
||||||
|
|
||||||
// to make the test not crash from assigning to nil map.
|
// to make the test not crash from assigning to nil map.
|
||||||
@ -137,7 +137,7 @@ func TestSyncWatchers(t *testing.T) {
|
|||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
|
|
||||||
s := &watchableStore{
|
s := &watchableStore{
|
||||||
store: NewStore(b, &lease.FakeLessor{}),
|
store: NewStore(b, &lease.FakeLessor{}, nil),
|
||||||
unsynced: newWatcherGroup(),
|
unsynced: newWatcherGroup(),
|
||||||
synced: newWatcherGroup(),
|
synced: newWatcherGroup(),
|
||||||
}
|
}
|
||||||
@ -220,7 +220,7 @@ func TestSyncWatchers(t *testing.T) {
|
|||||||
// TestWatchCompacted tests a watcher that watches on a compacted revision.
|
// TestWatchCompacted tests a watcher that watches on a compacted revision.
|
||||||
func TestWatchCompacted(t *testing.T) {
|
func TestWatchCompacted(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(b, &lease.FakeLessor{})
|
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -257,7 +257,7 @@ func TestWatchCompacted(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatchFutureRev(t *testing.T) {
|
func TestWatchFutureRev(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(b, &lease.FakeLessor{})
|
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -297,7 +297,7 @@ func TestWatchFutureRev(t *testing.T) {
|
|||||||
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
||||||
func TestWatchBatchUnsynced(t *testing.T) {
|
func TestWatchBatchUnsynced(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(b, &lease.FakeLessor{})
|
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
||||||
|
|
||||||
oldMaxRevs := watchBatchMaxRevs
|
oldMaxRevs := watchBatchMaxRevs
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -24,7 +24,7 @@ import (
|
|||||||
|
|
||||||
func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
|
func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
watchable := newWatchableStore(be, &lease.FakeLessor{})
|
watchable := newWatchableStore(be, &lease.FakeLessor{}, nil)
|
||||||
|
|
||||||
defer cleanup(watchable, be, tmpPath)
|
defer cleanup(watchable, be, tmpPath)
|
||||||
|
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
// and the watched event attaches the correct watchID.
|
// and the watched event attaches the correct watchID.
|
||||||
func TestWatcherWatchID(t *testing.T) {
|
func TestWatcherWatchID(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
|
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
@ -81,7 +81,7 @@ func TestWatcherWatchID(t *testing.T) {
|
|||||||
// and returns events with matching prefixes.
|
// and returns events with matching prefixes.
|
||||||
func TestWatcherWatchPrefix(t *testing.T) {
|
func TestWatcherWatchPrefix(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
|
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
@ -155,7 +155,7 @@ func TestWatcherWatchPrefix(t *testing.T) {
|
|||||||
// with given id inside watchStream.
|
// with given id inside watchStream.
|
||||||
func TestWatchStreamCancelWatcherByID(t *testing.T) {
|
func TestWatchStreamCancelWatcherByID(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
|
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
@ -198,7 +198,7 @@ func TestWatcherRequestProgress(t *testing.T) {
|
|||||||
// method to sync watchers in unsynced map. We want to keep watchers
|
// method to sync watchers in unsynced map. We want to keep watchers
|
||||||
// in unsynced to test if syncWatchers works as expected.
|
// in unsynced to test if syncWatchers works as expected.
|
||||||
s := &watchableStore{
|
s := &watchableStore{
|
||||||
store: NewStore(b, &lease.FakeLessor{}),
|
store: NewStore(b, &lease.FakeLessor{}, nil),
|
||||||
unsynced: newWatcherGroup(),
|
unsynced: newWatcherGroup(),
|
||||||
synced: newWatcherGroup(),
|
synced: newWatcherGroup(),
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,7 @@ var (
|
|||||||
|
|
||||||
func initStorage() {
|
func initStorage() {
|
||||||
be := backend.New("storage-bench", time.Duration(batchInterval), batchLimit)
|
be := backend.New("storage-bench", time.Duration(batchInterval), batchLimit)
|
||||||
s = storage.NewStore(be, &lease.FakeLessor{})
|
s = storage.NewStore(be, &lease.FakeLessor{}, nil)
|
||||||
os.Remove("storage-bench") // boltDB has an opened fd, so removing the file is ok
|
os.Remove("storage-bench") // boltDB has an opened fd, so removing the file is ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user