diff --git a/etcdserver/server.go b/etcdserver/server.go index dab41d0c5..d8a3d07b9 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -371,6 +371,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename)) srv.lessor = lease.NewLessor(srv.be) srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex) + srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex()) srv.authStore = auth.NewAuthStore(srv.be) if h := cfg.AutoCompactionRetention; h != 0 { srv.compactor = compactor.NewPeriodic(h, srv.kv, srv) @@ -601,6 +602,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { if err := s.kv.Restore(newbe); err != nil { plog.Panicf("restore KV error: %v", err) } + s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex()) // Closing old backend might block until all the txns // on the backend are finished. @@ -997,8 +999,6 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint var shouldstop bool for i := range es { e := es[i] - // set the consistent index of current executing entry - s.consistIndex.setConsistentIndex(e.Index) switch e.Type { case raftpb.EntryNormal: // raft state machine may generate noop entry when leader confirmation. @@ -1020,6 +1020,12 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint req := raftReq.V2 s.w.Trigger(req.ID, s.applyRequest(*req)) } else { + // do not re-apply applied entries. + if e.Index <= s.consistIndex.ConsistentIndex() { + break + } + // set the consistent index of current executing entry + s.consistIndex.setConsistentIndex(e.Index) ar := s.applyV3Request(&raftReq) if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 { s.w.Trigger(raftReq.ID, ar) diff --git a/storage/consistent_watchable_store.go b/storage/consistent_watchable_store.go deleted file mode 100644 index 773b0a0c3..000000000 --- a/storage/consistent_watchable_store.go +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright 2015 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "encoding/binary" - "log" - - "github.com/coreos/etcd/lease" - "github.com/coreos/etcd/storage/backend" - "github.com/coreos/etcd/storage/storagepb" -) - -var ( - consistentIndexKeyName = []byte("consistent_index") -) - -// ConsistentIndexGetter is an interface that wraps the Get method. -// Consistent index is the offset of an entry in a consistent replicated log. -type ConsistentIndexGetter interface { - // ConsistentIndex returns the consistent index of current executing entry. - ConsistentIndex() uint64 -} - -type consistentWatchableStore struct { - *watchableStore - // The field is used to get the consistent index of current - // executing entry. - // When the store finishes executing current entry, it will - // put the index got from ConsistentIndexGetter into the - // underlying backend. This helps to recover consistent index - // when restoring. - ig ConsistentIndexGetter - - skip bool // indicate whether or not to skip an operation -} - -func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV { - return newConsistentWatchableStore(b, le, ig) -} - -// newConsistentWatchableStore creates a new consistentWatchableStore with the give -// backend. -func newConsistentWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *consistentWatchableStore { - return &consistentWatchableStore{ - watchableStore: newWatchableStore(b, le), - ig: ig, - } -} - -func (s *consistentWatchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) { - id := s.TxnBegin() - rev, err := s.TxnPut(id, key, value, lease) - if err != nil { - log.Panicf("unexpected TxnPut error (%v)", err) - } - if err := s.TxnEnd(id); err != nil { - log.Panicf("unexpected TxnEnd error (%v)", err) - } - return rev -} - -func (s *consistentWatchableStore) DeleteRange(key, end []byte) (n, rev int64) { - id := s.TxnBegin() - n, rev, err := s.TxnDeleteRange(id, key, end) - if err != nil { - log.Panicf("unexpected TxnDeleteRange error (%v)", err) - } - if err := s.TxnEnd(id); err != nil { - log.Panicf("unexpected TxnEnd error (%v)", err) - } - return n, rev -} - -func (s *consistentWatchableStore) TxnBegin() int64 { - id := s.watchableStore.TxnBegin() - - // If the consistent index of executing entry is not larger than store - // consistent index, skip all operations in this txn. - s.skip = s.ig.ConsistentIndex() <= s.consistentIndex() - - if !s.skip { - // TODO: avoid this unnecessary allocation - bs := make([]byte, 8) - binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex()) - // put the index into the underlying backend - // tx has been locked in TxnBegin, so there is no need to lock it again - s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) - } - - return id -} - -func (s *consistentWatchableStore) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { - if s.skip { - return nil, 0, nil - } - return s.watchableStore.TxnRange(txnID, key, end, limit, rangeRev) -} - -func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) { - if s.skip { - return 0, nil - } - return s.watchableStore.TxnPut(txnID, key, value, lease) -} - -func (s *consistentWatchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { - if s.skip { - return 0, 0, nil - } - return s.watchableStore.TxnDeleteRange(txnID, key, end) -} - -func (s *consistentWatchableStore) TxnEnd(txnID int64) error { - // reset skip var - s.skip = false - return s.watchableStore.TxnEnd(txnID) -} - -func (s *consistentWatchableStore) consistentIndex() uint64 { - // get the index - // tx has been locked in TxnBegin, so there is no need to lock it again - _, vs := s.watchableStore.store.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) - if len(vs) == 0 { - return 0 - } - return binary.BigEndian.Uint64(vs[0]) -} diff --git a/storage/consistent_watchable_store_test.go b/storage/consistent_watchable_store_test.go deleted file mode 100644 index c787715d8..000000000 --- a/storage/consistent_watchable_store_test.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2015 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "testing" - - "github.com/coreos/etcd/lease" - "github.com/coreos/etcd/storage/backend" -) - -type indexVal uint64 - -func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) } - -func TestConsistentWatchableStoreConsistentIndex(t *testing.T) { - var idx indexVal - b, tmpPath := backend.NewDefaultTmpBackend() - s := newConsistentWatchableStore(b, &lease.FakeLessor{}, &idx) - defer cleanup(s, b, tmpPath) - - tests := []uint64{1, 2, 3, 5, 10} - for i, tt := range tests { - idx = indexVal(tt) - s.Put([]byte("foo"), []byte("bar"), lease.NoLease) - - id := s.TxnBegin() - g := s.consistentIndex() - s.TxnEnd(id) - if g != tt { - t.Errorf("#%d: index = %d, want %d", i, g, tt) - } - } -} - -func TestConsistentWatchableStoreSkip(t *testing.T) { - idx := indexVal(5) - b, tmpPath := backend.NewDefaultTmpBackend() - s := newConsistentWatchableStore(b, &lease.FakeLessor{}, &idx) - defer cleanup(s, b, tmpPath) - - s.Put([]byte("foo"), []byte("bar"), lease.NoLease) - - // put is skipped - rev := s.Put([]byte("foo"), []byte("bar"), lease.NoLease) - if rev != 0 { - t.Errorf("rev = %d, want 0", rev) - } -} diff --git a/storage/kv.go b/storage/kv.go index cd63a354c..fa5ac342a 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -101,4 +101,6 @@ type Watchable interface { // this entry are skipped and return empty response. type ConsistentWatchableKV interface { WatchableKV + // ConsistentIndex returns the current consistent index of the KV. + ConsistentIndex() uint64 } diff --git a/storage/kv_test.go b/storage/kv_test.go index 0ed0cf8b2..0427779b8 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -80,7 +80,7 @@ func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) } func testKVRange(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) kvs := put3TestKVs(s) @@ -146,7 +146,7 @@ func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) } func testKVRangeRev(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) kvs := put3TestKVs(s) @@ -182,7 +182,7 @@ func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) func testKVRangeBadRev(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) put3TestKVs(s) @@ -213,7 +213,7 @@ func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) } func testKVRangeLimit(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) kvs := put3TestKVs(s) @@ -251,7 +251,7 @@ func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutF func testKVPutMultipleTimes(t *testing.T, f putFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { @@ -313,7 +313,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) { for i, tt := range tests { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) s.Put([]byte("foo"), []byte("bar"), lease.NoLease) s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease) @@ -333,7 +333,7 @@ func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, t func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -354,7 +354,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) { // test that range, put, delete on single key in sequence repeatedly works correctly. func TestKVOperationInSequence(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { @@ -401,7 +401,7 @@ func TestKVOperationInSequence(t *testing.T) { func TestKVTxnBlockNonTxnOperations(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) tests := []func(){ func() { s.Range([]byte("foo"), nil, 0, 0) }, @@ -435,7 +435,7 @@ func TestKVTxnBlockNonTxnOperations(t *testing.T) { func TestKVTxnWrongID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) id := s.TxnBegin() @@ -472,7 +472,7 @@ func TestKVTxnWrongID(t *testing.T) { // test that txn range, put, delete on single key in sequence repeatedly works correctly. func TestKVTxnOperationInSequence(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { @@ -528,7 +528,7 @@ func TestKVTxnOperationInSequence(t *testing.T) { func TestKVCompactReserveLastValue(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar0"), 1) @@ -582,7 +582,7 @@ func TestKVCompactReserveLastValue(t *testing.T) { func TestKVCompactBad(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar0"), lease.NoLease) @@ -615,7 +615,7 @@ func TestKVHash(t *testing.T) { for i := 0; i < len(hashes); i++ { var err error b, tmpPath := backend.NewDefaultTmpBackend() - kv := NewStore(b, &lease.FakeLessor{}) + kv := NewStore(b, &lease.FakeLessor{}, nil) kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease) kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease) hashes[i], err = kv.Hash() @@ -652,7 +652,7 @@ func TestKVRestore(t *testing.T) { } for i, tt := range tests { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) tt(s) var kvss [][]storagepb.KeyValue for k := int64(0); k < 10; k++ { @@ -662,7 +662,7 @@ func TestKVRestore(t *testing.T) { s.Close() // ns should recover the the previous state from backend. - ns := NewStore(b, &lease.FakeLessor{}) + ns := NewStore(b, &lease.FakeLessor{}, nil) // wait for possible compaction to finish testutil.WaitSchedule() var nkvss [][]storagepb.KeyValue @@ -680,7 +680,7 @@ func TestKVRestore(t *testing.T) { func TestKVSnapshot(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) wkvs := put3TestKVs(s) @@ -700,7 +700,7 @@ func TestKVSnapshot(t *testing.T) { } f.Close() - ns := NewStore(b, &lease.FakeLessor{}) + ns := NewStore(b, &lease.FakeLessor{}, nil) defer ns.Close() kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0) if err != nil { @@ -716,7 +716,7 @@ func TestKVSnapshot(t *testing.T) { func TestWatchableKVWatch(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{})) + s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() diff --git a/storage/kvstore.go b/storage/kvstore.go index a86aa8e26..df2b696c5 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -15,6 +15,7 @@ package storage import ( + "encoding/binary" "errors" "log" "math" @@ -40,6 +41,7 @@ var ( markBytePosition = markedRevBytesLen - 1 markTombstone byte = 't' + consistentIndexKeyName = []byte("consistent_index") scheduledCompactKeyName = []byte("scheduledCompactRev") finishedCompactKeyName = []byte("finishedCompactRev") @@ -49,9 +51,18 @@ var ( ErrCanceled = errors.New("storage: watcher is canceled") ) +// ConsistentIndexGetter is an interface that wraps the Get method. +// Consistent index is the offset of an entry in a consistent replicated log. +type ConsistentIndexGetter interface { + // ConsistentIndex returns the consistent index of current executing entry. + ConsistentIndex() uint64 +} + type store struct { mu sync.Mutex // guards the following + ig ConsistentIndexGetter + b backend.Backend kvindex index @@ -72,9 +83,10 @@ type store struct { // NewStore returns a new store. It is useful to create a store inside // storage pkg. It should only be used for testing externally. -func NewStore(b backend.Backend, le lease.Lessor) *store { +func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store { s := &store{ b: b, + ig: ig, kvindex: newTreeIndex(), le: le, @@ -155,6 +167,7 @@ func (s *store) TxnBegin() int64 { s.currentRev.sub = 0 s.tx = s.b.BatchTx() s.tx.Lock() + s.saveIndex() s.txnID = rand.Int63() return s.txnID @@ -545,6 +558,31 @@ func (s *store) getChanges() []storagepb.KeyValue { return changes } +func (s *store) saveIndex() { + if s.ig == nil { + return + } + tx := s.tx + // TODO: avoid this unnecessary allocation + bs := make([]byte, 8) + binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex()) + // put the index into the underlying backend + // tx has been locked in TxnBegin, so there is no need to lock it again + tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) +} + +func (s *store) ConsistentIndex() uint64 { + // TODO: cache index in a uint64 field? + tx := s.b.BatchTx() + tx.Lock() + defer tx.Unlock() + _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) + if len(vs) == 0 { + return 0 + } + return binary.BigEndian.Uint64(vs[0]) +} + // appendMarkTombstone appends tombstone mark to normal revision bytes. func appendMarkTombstone(b []byte) []byte { if len(b) != revBytesLen { diff --git a/storage/kvstore_bench_test.go b/storage/kvstore_bench_test.go index 7bc9e31ca..45c778aff 100644 --- a/storage/kvstore_bench_test.go +++ b/storage/kvstore_bench_test.go @@ -24,7 +24,7 @@ import ( func BenchmarkStorePut(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(be, &lease.FakeLessor{}) + s := NewStore(be, &lease.FakeLessor{}, nil) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -43,7 +43,7 @@ func BenchmarkStorePut(b *testing.B) { // some synchronization operations, such as mutex locking. func BenchmarkStoreTxnPut(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(be, &lease.FakeLessor{}) + s := NewStore(be, &lease.FakeLessor{}, nil) defer cleanup(s, be, tmpPath) // arbitrary number of bytes diff --git a/storage/kvstore_compaction_test.go b/storage/kvstore_compaction_test.go index da8a523d3..2bd728175 100644 --- a/storage/kvstore_compaction_test.go +++ b/storage/kvstore_compaction_test.go @@ -62,7 +62,7 @@ func TestScheduleCompaction(t *testing.T) { } for i, tt := range tests { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) tx := s.b.BatchTx() tx.Lock() diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 6a7905a0f..86ec99f7b 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -32,7 +32,7 @@ import ( func TestStoreRev(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer s.Close() defer os.Remove(tmpPath) @@ -418,7 +418,7 @@ func TestStoreRestore(t *testing.T) { func TestRestoreContinueUnfinishedCompaction(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s0 := NewStore(b, &lease.FakeLessor{}) + s0 := NewStore(b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -435,7 +435,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { s0.Close() - s1 := NewStore(b, &lease.FakeLessor{}) + s1 := NewStore(b, &lease.FakeLessor{}, nil) // wait for scheduled compaction to be finished time.Sleep(100 * time.Millisecond) @@ -473,7 +473,7 @@ func TestTxnPut(t *testing.T) { vals := createBytesSlice(bytesN, sliceN) b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) for i := 0; i < sliceN; i++ { @@ -494,7 +494,7 @@ func TestTxnPut(t *testing.T) { func TestTxnBlockBackendForceCommit(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(b, &lease.FakeLessor{}) + s := NewStore(b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) id := s.TxnBegin() diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 375e11e28..818fcafcb 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -58,9 +58,13 @@ type watchableStore struct { // cancel operations. type cancelFunc func() -func newWatchableStore(b backend.Backend, le lease.Lessor) *watchableStore { +func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV { + return newWatchableStore(b, le, ig) +} + +func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore { s := &watchableStore{ - store: NewStore(b, le), + store: NewStore(b, le, ig), unsynced: newWatcherGroup(), synced: newWatcherGroup(), stopc: make(chan struct{}), diff --git a/storage/watchable_store_bench_test.go b/storage/watchable_store_bench_test.go index f58324337..03aa33ed8 100644 --- a/storage/watchable_store_bench_test.go +++ b/storage/watchable_store_bench_test.go @@ -32,7 +32,7 @@ import ( // we should put to simulate the real-world use cases. func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(be, &lease.FakeLessor{}) + s := NewStore(be, &lease.FakeLessor{}, nil) // manually create watchableStore instead of newWatchableStore // because newWatchableStore periodically calls syncWatchersLoop @@ -89,7 +89,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(be, &lease.FakeLessor{}) + s := newWatchableStore(be, &lease.FakeLessor{}, nil) defer func() { s.store.Close() diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index 8c701b6d0..e1db3e5ae 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -28,7 +28,7 @@ import ( func TestWatch(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}) + s := newWatchableStore(b, &lease.FakeLessor{}, nil) defer func() { s.store.Close() @@ -50,7 +50,7 @@ func TestWatch(t *testing.T) { func TestNewWatcherCancel(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}) + s := newWatchableStore(b, &lease.FakeLessor{}, nil) defer func() { s.store.Close() @@ -82,7 +82,7 @@ func TestCancelUnsynced(t *testing.T) { // method to sync watchers in unsynced map. We want to keep watchers // in unsynced to test if syncWatchers works as expected. s := &watchableStore{ - store: NewStore(b, &lease.FakeLessor{}), + store: NewStore(b, &lease.FakeLessor{}, nil), unsynced: newWatcherGroup(), // to make the test not crash from assigning to nil map. @@ -137,7 +137,7 @@ func TestSyncWatchers(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := &watchableStore{ - store: NewStore(b, &lease.FakeLessor{}), + store: NewStore(b, &lease.FakeLessor{}, nil), unsynced: newWatcherGroup(), synced: newWatcherGroup(), } @@ -220,7 +220,7 @@ func TestSyncWatchers(t *testing.T) { // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}) + s := newWatchableStore(b, &lease.FakeLessor{}, nil) defer func() { s.store.Close() @@ -257,7 +257,7 @@ func TestWatchCompacted(t *testing.T) { func TestWatchFutureRev(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}) + s := newWatchableStore(b, &lease.FakeLessor{}, nil) defer func() { s.store.Close() @@ -297,7 +297,7 @@ func TestWatchFutureRev(t *testing.T) { // TestWatchBatchUnsynced tests batching on unsynced watchers func TestWatchBatchUnsynced(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}) + s := newWatchableStore(b, &lease.FakeLessor{}, nil) oldMaxRevs := watchBatchMaxRevs defer func() { diff --git a/storage/watcher_bench_test.go b/storage/watcher_bench_test.go index 0d14b858e..97d748f24 100644 --- a/storage/watcher_bench_test.go +++ b/storage/watcher_bench_test.go @@ -24,7 +24,7 @@ import ( func BenchmarkKVWatcherMemoryUsage(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - watchable := newWatchableStore(be, &lease.FakeLessor{}) + watchable := newWatchableStore(be, &lease.FakeLessor{}, nil) defer cleanup(watchable, be, tmpPath) diff --git a/storage/watcher_test.go b/storage/watcher_test.go index a59b6f555..e87d1c887 100644 --- a/storage/watcher_test.go +++ b/storage/watcher_test.go @@ -29,7 +29,7 @@ import ( // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{})) + s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -81,7 +81,7 @@ func TestWatcherWatchID(t *testing.T) { // and returns events with matching prefixes. func TestWatcherWatchPrefix(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{})) + s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -155,7 +155,7 @@ func TestWatcherWatchPrefix(t *testing.T) { // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{})) + s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -198,7 +198,7 @@ func TestWatcherRequestProgress(t *testing.T) { // method to sync watchers in unsynced map. We want to keep watchers // in unsynced to test if syncWatchers works as expected. s := &watchableStore{ - store: NewStore(b, &lease.FakeLessor{}), + store: NewStore(b, &lease.FakeLessor{}, nil), unsynced: newWatcherGroup(), synced: newWatcherGroup(), } diff --git a/tools/benchmark/cmd/storage.go b/tools/benchmark/cmd/storage.go index 4ad3c7556..0420cdb9e 100644 --- a/tools/benchmark/cmd/storage.go +++ b/tools/benchmark/cmd/storage.go @@ -33,7 +33,7 @@ var ( func initStorage() { be := backend.New("storage-bench", time.Duration(batchInterval), batchLimit) - s = storage.NewStore(be, &lease.FakeLessor{}) + s = storage.NewStore(be, &lease.FakeLessor{}, nil) os.Remove("storage-bench") // boltDB has an opened fd, so removing the file is ok }