Save raftpb.ConfState in the backend.

This makes (bbolt) backend a full feature snapshot in term of WAL/raft,
i.e. carries:
  - commit : (applied_index)
  - confState

Benefits:
  - Backend will be a sufficient point in time definition sufficient to
start replaying WAL. We have applied_index & confState in consistent
state.
  - In case of emergency a backend state can be used for recovery
This commit is contained in:
Piotr Tabor 2021-05-13 12:37:43 +02:00
parent 3cb1ba4b2b
commit 865df75714
10 changed files with 283 additions and 64 deletions

View File

@ -694,6 +694,7 @@ func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version {
return semver.Must(semver.NewVersion(*e.Node.Value))
}
// The field is populated since etcd v3.5.
func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Version {
ckey := backendClusterVersionKey()
tx := be.ReadTx()
@ -712,6 +713,7 @@ func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Versi
return semver.Must(semver.NewVersion(string(vals[0])))
}
// The field is populated since etcd v3.5.
func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo {
dkey := backendDowngradeKey()
tx := be.ReadTx()

View File

@ -0,0 +1,63 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package membership
import (
"encoding/json"
"log"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.uber.org/zap"
)
var (
confStateKey = []byte("confState")
)
// MustUnsafeSaveConfStateToBackend persists confState using given transaction (tx).
// confState in backend is persisted since etcd v3.5.
func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confState *raftpb.ConfState) {
confStateBytes, err := json.Marshal(confState)
if err != nil {
lg.Panic("Cannot marshal raftpb.ConfState", zap.Stringer("conf-state", confState), zap.Error(err))
}
tx.UnsafePut(mvcc.MetaBucketName, confStateKey, confStateBytes)
}
// UnsafeConfStateFromBackend retrieves ConfState from the backend.
// Returns nil if confState in backend is not persisted (e.g. backend writen by <v3.5).
func UnsafeConfStateFromBackend(lg *zap.Logger, tx backend.ReadTx) *raftpb.ConfState {
keys, vals := tx.UnsafeRange(mvcc.MetaBucketName, confStateKey, nil, 0)
if len(keys) == 0 {
return nil
}
if len(keys) != 1 {
lg.Panic(
"unexpected number of key: "+string(confStateKey)+" when getting cluster version from backend",
zap.Int("number-of-key", len(keys)),
)
}
var confState raftpb.ConfState
if err := json.Unmarshal(vals[0], &confState); err != nil {
log.Panic("Cannot unmarshal confState json retrieved from the backend",
zap.ByteString("conf-state-json", vals[0]),
zap.Error(err))
}
return &confState
}

View File

@ -0,0 +1,79 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package membership_test
import (
"testing"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.uber.org/zap/zaptest"
)
func TestConfStateFromBackendInOneTx(t *testing.T) {
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
tx := be.BatchTx()
cindex.CreateMetaBucket(tx)
tx.Lock()
defer tx.Unlock()
assert.Nil(t, membership.UnsafeConfStateFromBackend(lg, tx))
confState := raftpb.ConfState{Learners: []uint64{1, 2}, Voters: []uint64{3}, AutoLeave: false}
membership.MustUnsafeSaveConfStateToBackend(lg, tx, &confState)
assert.Equal(t, confState, *membership.UnsafeConfStateFromBackend(lg, tx))
}
func TestMustUnsafeSaveConfStateToBackend(t *testing.T) {
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
{
tx := be.BatchTx()
cindex.CreateMetaBucket(tx)
tx.Commit()
}
t.Run("missing", func(t *testing.T) {
tx := be.ReadTx()
tx.Lock()
defer tx.Unlock()
assert.Nil(t, membership.UnsafeConfStateFromBackend(lg, tx))
})
confState := raftpb.ConfState{Learners: []uint64{1, 2}, Voters: []uint64{3}, AutoLeave: false}
t.Run("save", func(t *testing.T) {
tx := be.BatchTx()
tx.Lock()
membership.MustUnsafeSaveConfStateToBackend(lg, tx, &confState)
tx.Unlock()
tx.Commit()
})
t.Run("read", func(t *testing.T) {
tx := be.ReadTx()
tx.Lock()
defer tx.Unlock()
assert.Equal(t, confState, *membership.UnsafeConfStateFromBackend(lg, tx))
})
}

View File

@ -161,6 +161,7 @@ func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {
return nil
}
// The field is populated since etcd v3.5.
func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
ckey := backendClusterVersionKey()
@ -170,6 +171,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
tx.UnsafePut(clusterBucketName, ckey, []byte(ver.String()))
}
// The field is populated since etcd v3.5.
func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) {
dkey := backendDowngradeKey()
dvalue, err := json.Marshal(downgrade)

View File

@ -260,7 +260,7 @@ type EtcdServer struct {
lessor lease.Lessor
bemu sync.Mutex
be backend.Backend
beHooks backend.Hooks
beHooks *backendHooks
authStore auth.AuthStore
alarmStore *v3alarm.AlarmStore
@ -298,10 +298,31 @@ type EtcdServer struct {
type backendHooks struct {
indexer cindex.ConsistentIndexer
lg *zap.Logger
// confState to be written in the next submitted backend transaction (if dirty)
confState raftpb.ConfState
// first write changes it to 'dirty'. false by default, so
// not initialized `confState` is meaningless.
confStateDirty bool
confStateLock sync.Mutex
}
func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
bh.indexer.UnsafeSave(tx)
bh.confStateLock.Lock()
defer bh.confStateLock.Unlock()
if bh.confStateDirty {
membership.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState)
// save bh.confState
bh.confStateDirty = false
}
}
func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) {
bh.confStateLock.Lock()
defer bh.confStateLock.Unlock()
bh.confState = *confState
bh.confStateDirty = true
}
// NewServer creates a new EtcdServer from the supplied configuration. The
@ -2238,6 +2259,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
lg := s.Logger()
*confState = *s.r.ApplyConfChange(cc)
s.beHooks.SetConfState(confState)
switch cc.Type {
case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
confChangeContext := new(membership.ConfigChangeContext)

View File

@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"os"
"path/filepath"
@ -604,12 +605,14 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
Node: newNodeNop(),
transport: newNopTransporter(),
})
lg := zaptest.NewLogger(t)
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
lg: lg,
id: 1,
r: *r,
cluster: cl,
beHooks: &backendHooks{lg: lg},
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
@ -638,22 +641,26 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
// TestApplyConfigChangeUpdatesConsistIndex ensures a config change also updates the consistIndex
// where consistIndex equals to applied index.
func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
lg := zaptest.NewLogger(t)
cl := membership.NewCluster(zaptest.NewLogger(t))
cl.SetStore(v2store.New())
cl.AddMember(&membership.Member{ID: types.ID(1)}, true)
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: newNodeNop(),
transport: newNopTransporter(),
})
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
cindex.CreateMetaBucket(be.BatchTx())
ci := cindex.NewConsistentIndex(be)
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
lg: lg,
id: 1,
r: *r,
r: *realisticRaftNode(lg),
cluster: cl,
w: wait.New(),
consistIndex: cindex.NewFakeConsistentIndex(0),
consistIndex: ci,
beHooks: &backendHooks{lg: lg, indexer: ci},
}
// create EntryConfChange entry
@ -680,29 +687,61 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
if consistIndex != appliedi {
t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
}
t.Run("verify-backend", func(t *testing.T) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
srv.beHooks.OnPreCommitUnsafe(tx)
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx))
})
assert.Equal(t, consistIndex, cindex.ReadConsistentIndex(be.BatchTx()))
}
func realisticRaftNode(lg *zap.Logger) *raftNode {
storage := raft.NewMemoryStorage()
storage.SetHardState(raftpb.HardState{Commit: 0, Term: 0})
c := &raft.Config{
ID: 1,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: storage,
MaxSizePerMsg: math.MaxUint64,
MaxInflightMsgs: 256,
}
n := raft.RestartNode(c)
r := newRaftNode(raftNodeConfig{
lg: lg,
Node: n,
transport: newNopTransporter(),
})
return r
}
// TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop
// if the local member is removed along with other conf updates.
func TestApplyMultiConfChangeShouldStop(t *testing.T) {
cl := membership.NewCluster(zaptest.NewLogger(t))
lg := zaptest.NewLogger(t)
cl := membership.NewCluster(lg)
cl.SetStore(v2store.New())
for i := 1; i <= 5; i++ {
cl.AddMember(&membership.Member{ID: types.ID(i)}, true)
}
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
lg: lg,
Node: newNodeNop(),
transport: newNopTransporter(),
})
ci := cindex.NewFakeConsistentIndex(0)
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
lg: lg,
id: 2,
r: *r,
cluster: cl,
w: wait.New(),
consistIndex: cindex.NewFakeConsistentIndex(0),
consistIndex: ci,
beHooks: &backendHooks{lg: lg, indexer: ci},
}
ents := []raftpb.Entry{}
for i := 1; i <= 4; i++ {
@ -1036,12 +1075,13 @@ func TestSnapshot(t *testing.T) {
// TestSnapshotOrdering ensures raft persists snapshot onto disk before
// snapshot db is applied.
func TestSnapshotOrdering(t *testing.T) {
lg := zaptest.NewLogger(t)
n := newNopReadyNode()
st := v2store.New()
cl := membership.NewCluster(zaptest.NewLogger(t))
cl := membership.NewCluster(lg)
cl.SetStore(st)
testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
testdir, err := ioutil.TempDir(t.TempDir(), "testsnapdir")
if err != nil {
t.Fatalf("couldn't open tempdir (%v)", err)
}
@ -1056,29 +1096,30 @@ func TestSnapshotOrdering(t *testing.T) {
p := mockstorage.NewStorageRecorderStream(testdir)
tr, snapDoneC := newSnapTransporter(snapdir)
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
lg: lg,
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
transport: tr,
storage: p,
raftStorage: rs,
})
be, tmpPath := betesting.NewDefaultTmpBackend(t)
defer os.RemoveAll(tmpPath)
be, _ := betesting.NewDefaultTmpBackend(t)
ci := cindex.NewConsistentIndex(be)
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: config.ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
lg: lg,
Cfg: config.ServerConfig{Logger: lg, DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: st,
snapshotter: snap.New(zap.NewExample(), snapdir),
snapshotter: snap.New(lg, snapdir),
cluster: cl,
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewConsistentIndex(be),
consistIndex: ci,
beHooks: &backendHooks{lg: lg, indexer: ci},
}
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{})
s.be = be
s.start()
@ -1190,12 +1231,13 @@ func TestTriggerSnap(t *testing.T) {
// TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
// proposals.
func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
lg := zaptest.NewLogger(t)
n := newNopReadyNode()
st := v2store.New()
cl := membership.NewCluster(zaptest.NewLogger(t))
cl := membership.NewCluster(lg)
cl.SetStore(st)
testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
testdir, err := ioutil.TempDir(t.TempDir(), "testsnapdir")
if err != nil {
t.Fatalf("Couldn't open tempdir (%v)", err)
}
@ -1207,31 +1249,30 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
rs := raft.NewMemoryStorage()
tr, snapDoneC := newSnapTransporter(testdir)
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
lg: lg,
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
transport: tr,
storage: mockstorage.NewStorageRecorder(testdir),
raftStorage: rs,
})
be, tmpPath := betesting.NewDefaultTmpBackend(t)
defer func() {
os.RemoveAll(tmpPath)
}()
be, _ := betesting.NewDefaultTmpBackend(t)
ci := cindex.NewConsistentIndex(be)
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: config.ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
lg: lg,
Cfg: config.ServerConfig{Logger: lg, DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: st,
snapshotter: snap.New(zap.NewExample(), testdir),
snapshotter: snap.New(lg, testdir),
cluster: cl,
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewConsistentIndex(be),
consistIndex: ci,
beHooks: &backendHooks{lg: lg, indexer: ci},
}
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{})
s.be = be
s.start()
@ -1288,6 +1329,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
// TestAddMember tests AddMember can propose and perform node addition.
func TestAddMember(t *testing.T) {
lg := zaptest.NewLogger(t)
n := newNodeConfChangeCommitterRecorder()
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateLeader},
@ -1296,7 +1338,7 @@ func TestAddMember(t *testing.T) {
st := v2store.New()
cl.SetStore(st)
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
lg: lg,
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
@ -1304,13 +1346,14 @@ func TestAddMember(t *testing.T) {
})
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
lg: lg,
r: *r,
v2store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
beHooks: &backendHooks{lg: lg},
}
s.start()
m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}}
@ -1332,6 +1375,7 @@ func TestAddMember(t *testing.T) {
// TestRemoveMember tests RemoveMember can propose and perform node removal.
func TestRemoveMember(t *testing.T) {
lg := zaptest.NewLogger(t)
n := newNodeConfChangeCommitterRecorder()
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateLeader},
@ -1341,7 +1385,7 @@ func TestRemoveMember(t *testing.T) {
cl.SetStore(v2store.New())
cl.AddMember(&membership.Member{ID: 1234}, true)
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
lg: lg,
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
@ -1356,6 +1400,7 @@ func TestRemoveMember(t *testing.T) {
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
beHooks: &backendHooks{lg: lg},
}
s.start()
_, err := s.RemoveMember(context.Background(), 1234)
@ -1376,6 +1421,7 @@ func TestRemoveMember(t *testing.T) {
// TestUpdateMember tests RemoveMember can propose and perform node update.
func TestUpdateMember(t *testing.T) {
lg := zaptest.NewLogger(t)
n := newNodeConfChangeCommitterRecorder()
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateLeader},
@ -1385,7 +1431,7 @@ func TestUpdateMember(t *testing.T) {
cl.SetStore(st)
cl.AddMember(&membership.Member{ID: 1234}, true)
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
lg: lg,
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
@ -1393,13 +1439,14 @@ func TestUpdateMember(t *testing.T) {
})
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
lg: lg,
r: *r,
v2store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
beHooks: &backendHooks{lg: lg},
}
s.start()
wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
@ -1422,6 +1469,7 @@ func TestUpdateMember(t *testing.T) {
// TODO: test server could stop itself when being removed
func TestPublish(t *testing.T) {
lg := zaptest.NewLogger(t)
n := newNodeRecorder()
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
@ -1430,11 +1478,11 @@ func TestPublish(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
lg: lg,
readych: make(chan struct{}),
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
id: 1,
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
cluster: &membership.RaftCluster{},
w: w,
@ -1476,16 +1524,17 @@ func TestPublish(t *testing.T) {
// TestPublishStopped tests that publish will be stopped if server is stopped.
func TestPublishStopped(t *testing.T) {
lg := zaptest.NewLogger(t)
ctx, cancel := context.WithCancel(context.Background())
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
lg: lg,
Node: newNodeNop(),
transport: newNopTransporter(),
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
lg: lg,
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
cluster: &membership.RaftCluster{},
w: mockwait.NewNop(),
@ -1504,13 +1553,15 @@ func TestPublishStopped(t *testing.T) {
// TestPublishRetry tests that publish will keep retry until success.
func TestPublishRetry(t *testing.T) {
lg := zaptest.NewLogger(t)
ctx, cancel := context.WithCancel(context.Background())
n := newNodeRecorderStream()
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
lg: lg,
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
w: mockwait.NewNop(),
stopping: make(chan struct{}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),

View File

@ -35,7 +35,7 @@ import (
var (
keyBucketName = []byte("key")
metaBucketName = cindex.MetaBucketName
MetaBucketName = cindex.MetaBucketName
scheduledCompactKeyName = []byte("scheduledCompactRev")
finishedCompactKeyName = []byte("finishedCompactRev")
@ -242,7 +242,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
tx := s.b.BatchTx()
tx.Lock()
tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
tx.UnsafePut(MetaBucketName, scheduledCompactKeyName, rbytes)
tx.Unlock()
// ensure that desired compaction is persisted
s.b.ForceCommit()
@ -304,7 +304,7 @@ func init() {
DefaultIgnores = map[backend.IgnoreKey]struct{}{
// consistent index might be changed due to v2 internal sync, which
// is not controllable by the user.
{Bucket: string(metaBucketName), Key: string(cindex.ConsistentIndexKeyName)}: {},
{Bucket: string(MetaBucketName), Key: string(cindex.ConsistentIndexKeyName)}: {},
}
}
@ -351,20 +351,20 @@ func (s *store) restore() error {
tx := s.b.BatchTx()
tx.Lock()
_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
_, finishedCompactBytes := tx.UnsafeRange(MetaBucketName, finishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
s.revMu.Lock()
s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
s.lg.Info(
"restored last compact revision",
zap.String("meta-bucket-name", string(metaBucketName)),
zap.String("meta-bucket-name", string(MetaBucketName)),
zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
zap.Int64("restored-compact-revision", s.compactMainRev),
)
s.revMu.Unlock()
}
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
_, scheduledCompactBytes := tx.UnsafeRange(MetaBucketName, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
@ -435,7 +435,7 @@ func (s *store) restore() error {
s.lg.Info(
"resume scheduled compaction",
zap.String("meta-bucket-name", string(metaBucketName)),
zap.String("meta-bucket-name", string(MetaBucketName)),
zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
zap.Int64("scheduled-compact-revision", scheduledCompact),
)

View File

@ -51,7 +51,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
if len(keys) < s.cfg.CompactionBatchLimit {
rbytes := make([]byte, 8+1+8)
revToBytes(revision{main: compactMainRev}, rbytes)
tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
tx.UnsafePut(MetaBucketName, finishedCompactKeyName, rbytes)
tx.Unlock()
s.lg.Info(
"finished scheduled compaction",

View File

@ -88,7 +88,7 @@ func TestScheduleCompaction(t *testing.T) {
t.Errorf("#%d: range on %v = %d, want 1", i, rev, len(keys))
}
}
_, vals := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
_, vals := tx.UnsafeRange(MetaBucketName, finishedCompactKeyName, nil, 0)
revToBytes(revision{main: tt.rev}, ibytes)
if w := [][]byte{ibytes}; !reflect.DeepEqual(vals, w) {
t.Errorf("#%d: vals on %v = %+v, want %+v", i, finishedCompactKeyName, vals, w)

View File

@ -342,10 +342,10 @@ func TestStoreCompact(t *testing.T) {
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(4))
wact := []testutil.Action{
{Name: "put", Params: []interface{}{metaBucketName, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "put", Params: []interface{}{MetaBucketName, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "range", Params: []interface{}{keyBucketName, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []interface{}{keyBucketName, key2}},
{Name: "put", Params: []interface{}{metaBucketName, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "put", Params: []interface{}{MetaBucketName, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact)
@ -398,8 +398,8 @@ func TestStoreRestore(t *testing.T) {
t.Errorf("current rev = %v, want 5", s.currentRev)
}
wact := []testutil.Action{
{Name: "range", Params: []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{MetaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{MetaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
@ -484,7 +484,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
revToBytes(revision{main: 2}, rbytes)
tx := s0.b.BatchTx()
tx.Lock()
tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
tx.UnsafePut(MetaBucketName, scheduledCompactKeyName, rbytes)
tx.Unlock()
s0.Close()