diff --git a/etcdserver/consistent_index.go b/etcdserver/consistent_index.go new file mode 100644 index 000000000..e63126692 --- /dev/null +++ b/etcdserver/consistent_index.go @@ -0,0 +1,25 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +// consistentIndex represents the offset of an entry in a consistent replica log. +// It implements the storage.ConsistentIndexGetter interface. +// It is always set to the offset of current entry before executing the entry, +// so ConsistentWatchableKV could get the consistent index from it. +type consistentIndex uint64 + +func (i *consistentIndex) setConsistentIndex(v uint64) { *i = consistentIndex(v) } + +func (i *consistentIndex) ConsistentIndex() uint64 { return uint64(*i) } diff --git a/etcdserver/consistent_index_test.go b/etcdserver/consistent_index_test.go new file mode 100644 index 000000000..8ce949ed1 --- /dev/null +++ b/etcdserver/consistent_index_test.go @@ -0,0 +1,25 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import "testing" + +func TestConsistentIndex(t *testing.T) { + var i consistentIndex + i.setConsistentIndex(10) + if g := i.ConsistentIndex(); g != 10 { + t.Errorf("value = %d, want 10", g) + } +} diff --git a/etcdserver/server.go b/etcdserver/server.go index 6d1ef3976..146ba4022 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -161,13 +161,16 @@ type EtcdServer struct { cluster *cluster store store.Store - kv dstorage.KV + kv dstorage.ConsistentWatchableKV stats *stats.ServerStats lstats *stats.LeaderStats SyncTicker <-chan time.Time + // consistent index used to hold the offset of current executing entry + // It is initialized to 0 before executing any entry. + consistIndex consistentIndex // versionTr used to send requests for peer version versionTr *http.Transport reqIDGen *idutil.Generator @@ -345,7 +348,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err != nil && err != os.ErrExist { return nil, err } - srv.kv = dstorage.New(path.Join(cfg.StorageDir(), databaseFilename)) + srv.kv = dstorage.New(path.Join(cfg.StorageDir(), databaseFilename), &srv.consistIndex) if err := srv.kv.Restore(); err != nil { plog.Fatalf("v3 storage restore error: %v", err) } @@ -505,7 +508,7 @@ func (s *EtcdServer) run() { if err := os.Rename(snapfn, fn); err != nil { plog.Panicf("rename snapshot file error: %v", err) } - s.kv = dstorage.New(fn) + s.kv = dstorage.New(fn, &s.consistIndex) if err := s.kv.Restore(); err != nil { plog.Panicf("restore KV error: %v", err) } @@ -826,6 +829,8 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint var err error for i := range es { e := es[i] + // set the consistent index of current executing entry + s.consistIndex.setConsistentIndex(e.Index) switch e.Type { case raftpb.EntryNormal: // raft state machine may generate noop entry when leader confirmation. diff --git a/storage/consistent_watchable_store.go b/storage/consistent_watchable_store.go index ced58a1d4..1125c7014 100644 --- a/storage/consistent_watchable_store.go +++ b/storage/consistent_watchable_store.go @@ -41,6 +41,10 @@ type consistentWatchableStore struct { ig ConsistentIndexGetter } +func New(path string, ig ConsistentIndexGetter) ConsistentWatchableKV { + return newConsistentWatchableStore(path, ig) +} + // newConsistentWatchableStore creates a new consistentWatchableStore // using the file at the given path. // If the file at the given path does not exist then it will be created automatically. diff --git a/storage/kv_test.go b/storage/kv_test.go index 952be988a..2c8f32d8f 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -89,7 +89,7 @@ func TestKVRange(t *testing.T) { testKVRange(t, normalRangeFunc) } func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) } func testKVRange(t *testing.T, f rangeFunc) { - s := New(tmpPath) + s := newStore(tmpPath) defer cleanup(s, tmpPath) s.Put([]byte("foo"), []byte("bar")) @@ -156,7 +156,7 @@ func TestKVRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) } func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) } func testKVRangeRev(t *testing.T, f rangeFunc) { - s := New(tmpPath) + s := newStore(tmpPath) defer cleanup(s, tmpPath) s.Put([]byte("foo"), []byte("bar")) @@ -198,7 +198,7 @@ func TestKVRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) } func testKVRangeBadRev(t *testing.T, f rangeFunc) { - s := New(tmpPath) + s := newStore(tmpPath) defer cleanup(s, tmpPath) s.Put([]byte("foo"), []byte("bar")) @@ -230,7 +230,7 @@ func TestKVRangeLimit(t *testing.T) { testKVRangeLimit(t, normalRangeFunc) } func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) } func testKVRangeLimit(t *testing.T, f rangeFunc) { - s := New(tmpPath) + s := newStore(tmpPath) defer cleanup(s, tmpPath) s.Put([]byte("foo"), []byte("bar")) @@ -274,7 +274,7 @@ func TestKVPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, normalP func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutFunc) } func testKVPutMultipleTimes(t *testing.T, f putFunc) { - s := New(tmpPath) + s := newStore(tmpPath) defer cleanup(s, tmpPath) for i := 0; i < 10; i++ { @@ -335,7 +335,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) { } for i, tt := range tests { - s := New(tmpPath) + s := newStore(tmpPath) s.Put([]byte("foo"), []byte("bar")) s.Put([]byte("foo1"), []byte("bar1")) @@ -354,7 +354,7 @@ func TestKVDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, n func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, txnDeleteRangeFunc) } func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) { - s := New(tmpPath) + s := newStore(tmpPath) defer cleanup(s, tmpPath) s.Put([]byte("foo"), []byte("bar")) @@ -374,7 +374,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) { // test that range, put, delete on single key in sequence repeatedly works correctly. func TestKVOperationInSequence(t *testing.T) { - s := New(tmpPath) + s := newStore(tmpPath) defer cleanup(s, tmpPath) for i := 0; i < 10; i++ { @@ -420,7 +420,7 @@ func TestKVOperationInSequence(t *testing.T) { } func TestKVTxnBlockNonTnxOperations(t *testing.T) { - s := New(tmpPath) + s := newStore(tmpPath) defer cleanup(s, tmpPath) tests := []func(){ @@ -451,7 +451,7 @@ func TestKVTxnBlockNonTnxOperations(t *testing.T) { } func TestKVTxnWrongID(t *testing.T) { - s := New(tmpPath) + s := newStore(tmpPath) defer cleanup(s, tmpPath) id := s.TxnBegin() @@ -487,7 +487,7 @@ func TestKVTxnWrongID(t *testing.T) { // test that txn range, put, delete on single key in sequence repeatedly works correctly. func TestKVTnxOperationInSequence(t *testing.T) { - s := New(tmpPath) + s := newStore(tmpPath) defer cleanup(s, tmpPath) for i := 0; i < 10; i++ { @@ -542,7 +542,7 @@ func TestKVTnxOperationInSequence(t *testing.T) { } func TestKVCompactReserveLastValue(t *testing.T) { - s := New(tmpPath) + s := newStore(tmpPath) defer cleanup(s, tmpPath) s.Put([]byte("foo"), []byte("bar0")) @@ -595,7 +595,7 @@ func TestKVCompactReserveLastValue(t *testing.T) { } func TestKVCompactBad(t *testing.T) { - s := New(tmpPath) + s := newStore(tmpPath) defer cleanup(s, tmpPath) s.Put([]byte("foo"), []byte("bar0")) @@ -627,7 +627,7 @@ func TestKVHash(t *testing.T) { for i := 0; i < len(hashes); i++ { var err error - kv := New(tmpPath) + kv := newStore(tmpPath) kv.Put([]byte("foo0"), []byte("bar0")) kv.Put([]byte("foo1"), []byte("bar0")) hashes[i], err = kv.Hash() @@ -663,7 +663,7 @@ func TestKVRestore(t *testing.T) { }, } for i, tt := range tests { - s := New(tmpPath) + s := newStore(tmpPath) tt(s) var kvss [][]storagepb.KeyValue for k := int64(0); k < 10; k++ { @@ -672,7 +672,7 @@ func TestKVRestore(t *testing.T) { } s.Close() - ns := New(tmpPath) + ns := newStore(tmpPath) ns.Restore() // wait for possible compaction to finish testutil.WaitSchedule() @@ -690,7 +690,7 @@ func TestKVRestore(t *testing.T) { } func TestKVSnapshot(t *testing.T) { - s := New(tmpPath) + s := newStore(tmpPath) defer cleanup(s, tmpPath) s.Put([]byte("foo"), []byte("bar")) @@ -714,7 +714,7 @@ func TestKVSnapshot(t *testing.T) { } f.Close() - ns := New("new_test") + ns := newStore("new_test") defer cleanup(ns, "new_test") ns.Restore() kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0) diff --git a/storage/kvstore.go b/storage/kvstore.go index 5dcf10d40..4a1385e8e 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -59,10 +59,6 @@ type store struct { stopc chan struct{} } -func New(path string) KV { - return newStore(path) -} - func newStore(path string) *store { s := &store{ b: backend.New(path, batchInterval, batchLimit),