Merge pull request #3742 from yichengq/save-index

etcdserver: save consistent index into v3 storage
This commit is contained in:
Yicheng Qin 2015-10-24 09:48:28 -07:00
commit 7e38f05ceb
6 changed files with 84 additions and 29 deletions

View File

@ -0,0 +1,25 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
// consistentIndex represents the offset of an entry in a consistent replica log.
// It implements the storage.ConsistentIndexGetter interface.
// It is always set to the offset of current entry before executing the entry,
// so ConsistentWatchableKV could get the consistent index from it.
type consistentIndex uint64
func (i *consistentIndex) setConsistentIndex(v uint64) { *i = consistentIndex(v) }
func (i *consistentIndex) ConsistentIndex() uint64 { return uint64(*i) }

View File

@ -0,0 +1,25 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import "testing"
func TestConsistentIndex(t *testing.T) {
var i consistentIndex
i.setConsistentIndex(10)
if g := i.ConsistentIndex(); g != 10 {
t.Errorf("value = %d, want 10", g)
}
}

View File

@ -161,13 +161,16 @@ type EtcdServer struct {
cluster *cluster
store store.Store
kv dstorage.KV
kv dstorage.ConsistentWatchableKV
stats *stats.ServerStats
lstats *stats.LeaderStats
SyncTicker <-chan time.Time
// consistent index used to hold the offset of current executing entry
// It is initialized to 0 before executing any entry.
consistIndex consistentIndex
// versionTr used to send requests for peer version
versionTr *http.Transport
reqIDGen *idutil.Generator
@ -345,7 +348,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err != nil && err != os.ErrExist {
return nil, err
}
srv.kv = dstorage.New(path.Join(cfg.StorageDir(), databaseFilename))
srv.kv = dstorage.New(path.Join(cfg.StorageDir(), databaseFilename), &srv.consistIndex)
if err := srv.kv.Restore(); err != nil {
plog.Fatalf("v3 storage restore error: %v", err)
}
@ -505,7 +508,7 @@ func (s *EtcdServer) run() {
if err := os.Rename(snapfn, fn); err != nil {
plog.Panicf("rename snapshot file error: %v", err)
}
s.kv = dstorage.New(fn)
s.kv = dstorage.New(fn, &s.consistIndex)
if err := s.kv.Restore(); err != nil {
plog.Panicf("restore KV error: %v", err)
}
@ -826,6 +829,8 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
var err error
for i := range es {
e := es[i]
// set the consistent index of current executing entry
s.consistIndex.setConsistentIndex(e.Index)
switch e.Type {
case raftpb.EntryNormal:
// raft state machine may generate noop entry when leader confirmation.

View File

@ -26,8 +26,8 @@ var (
// ConsistentIndexGetter is an interface that wraps the Get method.
// Consistent index is the offset of an entry in a consistent replicated log.
type ConsistentIndexGetter interface {
// Get gets the consistent index of current executing entry.
Get() uint64
// ConsistentIndex returns the consistent index of current executing entry.
ConsistentIndex() uint64
}
type consistentWatchableStore struct {
@ -41,6 +41,10 @@ type consistentWatchableStore struct {
ig ConsistentIndexGetter
}
func New(path string, ig ConsistentIndexGetter) ConsistentWatchableKV {
return newConsistentWatchableStore(path, ig)
}
// newConsistentWatchableStore creates a new consistentWatchableStore
// using the file at the given path.
// If the file at the given path does not exist then it will be created automatically.
@ -80,7 +84,7 @@ func (s *consistentWatchableStore) TxnBegin() int64 {
// TODO: avoid this unnecessary allocation
bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, s.ig.Get())
binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)

View File

@ -89,7 +89,7 @@ func TestKVRange(t *testing.T) { testKVRange(t, normalRangeFunc) }
func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }
func testKVRange(t *testing.T, f rangeFunc) {
s := New(tmpPath)
s := newStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar"))
@ -156,7 +156,7 @@ func TestKVRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
func testKVRangeRev(t *testing.T, f rangeFunc) {
s := New(tmpPath)
s := newStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar"))
@ -198,7 +198,7 @@ func TestKVRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc)
func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) }
func testKVRangeBadRev(t *testing.T, f rangeFunc) {
s := New(tmpPath)
s := newStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar"))
@ -230,7 +230,7 @@ func TestKVRangeLimit(t *testing.T) { testKVRangeLimit(t, normalRangeFunc) }
func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }
func testKVRangeLimit(t *testing.T, f rangeFunc) {
s := New(tmpPath)
s := newStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar"))
@ -274,7 +274,7 @@ func TestKVPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, normalP
func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutFunc) }
func testKVPutMultipleTimes(t *testing.T, f putFunc) {
s := New(tmpPath)
s := newStore(tmpPath)
defer cleanup(s, tmpPath)
for i := 0; i < 10; i++ {
@ -335,7 +335,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
}
for i, tt := range tests {
s := New(tmpPath)
s := newStore(tmpPath)
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo1"), []byte("bar1"))
@ -354,7 +354,7 @@ func TestKVDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, n
func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, txnDeleteRangeFunc) }
func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
s := New(tmpPath)
s := newStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar"))
@ -374,7 +374,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
// test that range, put, delete on single key in sequence repeatedly works correctly.
func TestKVOperationInSequence(t *testing.T) {
s := New(tmpPath)
s := newStore(tmpPath)
defer cleanup(s, tmpPath)
for i := 0; i < 10; i++ {
@ -420,7 +420,7 @@ func TestKVOperationInSequence(t *testing.T) {
}
func TestKVTxnBlockNonTnxOperations(t *testing.T) {
s := New(tmpPath)
s := newStore(tmpPath)
defer cleanup(s, tmpPath)
tests := []func(){
@ -451,7 +451,7 @@ func TestKVTxnBlockNonTnxOperations(t *testing.T) {
}
func TestKVTxnWrongID(t *testing.T) {
s := New(tmpPath)
s := newStore(tmpPath)
defer cleanup(s, tmpPath)
id := s.TxnBegin()
@ -487,7 +487,7 @@ func TestKVTxnWrongID(t *testing.T) {
// test that txn range, put, delete on single key in sequence repeatedly works correctly.
func TestKVTnxOperationInSequence(t *testing.T) {
s := New(tmpPath)
s := newStore(tmpPath)
defer cleanup(s, tmpPath)
for i := 0; i < 10; i++ {
@ -542,7 +542,7 @@ func TestKVTnxOperationInSequence(t *testing.T) {
}
func TestKVCompactReserveLastValue(t *testing.T) {
s := New(tmpPath)
s := newStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar0"))
@ -595,7 +595,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
}
func TestKVCompactBad(t *testing.T) {
s := New(tmpPath)
s := newStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar0"))
@ -627,7 +627,7 @@ func TestKVHash(t *testing.T) {
for i := 0; i < len(hashes); i++ {
var err error
kv := New(tmpPath)
kv := newStore(tmpPath)
kv.Put([]byte("foo0"), []byte("bar0"))
kv.Put([]byte("foo1"), []byte("bar0"))
hashes[i], err = kv.Hash()
@ -663,7 +663,7 @@ func TestKVRestore(t *testing.T) {
},
}
for i, tt := range tests {
s := New(tmpPath)
s := newStore(tmpPath)
tt(s)
var kvss [][]storagepb.KeyValue
for k := int64(0); k < 10; k++ {
@ -672,7 +672,7 @@ func TestKVRestore(t *testing.T) {
}
s.Close()
ns := New(tmpPath)
ns := newStore(tmpPath)
ns.Restore()
// wait for possible compaction to finish
testutil.WaitSchedule()
@ -690,7 +690,7 @@ func TestKVRestore(t *testing.T) {
}
func TestKVSnapshot(t *testing.T) {
s := New(tmpPath)
s := newStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar"))
@ -714,7 +714,7 @@ func TestKVSnapshot(t *testing.T) {
}
f.Close()
ns := New("new_test")
ns := newStore("new_test")
defer cleanup(ns, "new_test")
ns.Restore()
kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0)
@ -821,7 +821,7 @@ func TestWatchableKVWatch(t *testing.T) {
type indexVal uint64
func (v *indexVal) Get() uint64 { return uint64(*v) }
func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) }
func TestConsistentWatchableKVConsistentIndex(t *testing.T) {
var idx indexVal

View File

@ -59,10 +59,6 @@ type store struct {
stopc chan struct{}
}
func New(path string) KV {
return newStore(path)
}
func newStore(path string) *store {
s := &store{
b: backend.New(path, batchInterval, batchLimit),