mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #10892 from tbg/rawnode-everywhere-attempt3
raft: use RawNode for node's event loop; clean up bootstrap
This commit is contained in:
commit
3c5e2f51e4
@ -479,7 +479,11 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
|
||||
}
|
||||
}
|
||||
|
||||
n = raft.StartNode(c, peers)
|
||||
if len(peers) == 0 {
|
||||
n = raft.RestartNode(c)
|
||||
} else {
|
||||
n = raft.StartNode(c, peers)
|
||||
}
|
||||
raftStatusMu.Lock()
|
||||
raftStatus = n.Status
|
||||
raftStatusMu.Unlock()
|
||||
|
80
raft/bootstrap.go
Normal file
80
raft/bootstrap.go
Normal file
@ -0,0 +1,80 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package raft
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
pb "go.etcd.io/etcd/raft/raftpb"
|
||||
)
|
||||
|
||||
// Bootstrap initializes the RawNode for first use by appending configuration
|
||||
// changes for the supplied peers. This method returns an error if the Storage
|
||||
// is nonempty.
|
||||
//
|
||||
// It is recommended that instead of calling this method, applications bootstrap
|
||||
// their state manually by setting up a Storage that has a first index > 1 and
|
||||
// which stores the desired ConfState as its InitialState.
|
||||
func (rn *RawNode) Bootstrap(peers []Peer) error {
|
||||
if len(peers) == 0 {
|
||||
return errors.New("must provide at least one peer to Bootstrap")
|
||||
}
|
||||
lastIndex, err := rn.raft.raftLog.storage.LastIndex()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if lastIndex != 0 {
|
||||
return errors.New("can't bootstrap a nonempty Storage")
|
||||
}
|
||||
|
||||
// We've faked out initial entries above, but nothing has been
|
||||
// persisted. Start with an empty HardState (thus the first Ready will
|
||||
// emit a HardState update for the app to persist).
|
||||
rn.prevHardSt = emptyState
|
||||
|
||||
// TODO(tbg): remove StartNode and give the application the right tools to
|
||||
// bootstrap the initial membership in a cleaner way.
|
||||
rn.raft.becomeFollower(1, None)
|
||||
ents := make([]pb.Entry, len(peers))
|
||||
for i, peer := range peers {
|
||||
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
|
||||
data, err := cc.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
|
||||
}
|
||||
rn.raft.raftLog.append(ents...)
|
||||
|
||||
// Now apply them, mainly so that the application can call Campaign
|
||||
// immediately after StartNode in tests. Note that these nodes will
|
||||
// be added to raft twice: here and when the application's Ready
|
||||
// loop calls ApplyConfChange. The calls to addNode must come after
|
||||
// all calls to raftLog.append so progress.next is set after these
|
||||
// bootstrapping entries (it is an error if we try to append these
|
||||
// entries since they have already been committed).
|
||||
// We do not set raftLog.applied so the application will be able
|
||||
// to observe all conf changes via Ready.CommittedEntries.
|
||||
//
|
||||
// TODO(bdarnell): These entries are still unstable; do we need to preserve
|
||||
// the invariant that committed < unstable?
|
||||
rn.raft.raftLog.committed = uint64(len(ents))
|
||||
for _, peer := range peers {
|
||||
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
|
||||
}
|
||||
return nil
|
||||
}
|
121
raft/node.go
121
raft/node.go
@ -197,52 +197,22 @@ type Peer struct {
|
||||
|
||||
// StartNode returns a new Node given configuration and a list of raft peers.
|
||||
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
|
||||
//
|
||||
// Peers must not be zero length; call RestartNode in that case.
|
||||
func StartNode(c *Config, peers []Peer) Node {
|
||||
r := newRaft(c)
|
||||
// become the follower at term 1 and apply initial configuration
|
||||
// entries of term 1
|
||||
r.becomeFollower(1, None)
|
||||
for _, peer := range peers {
|
||||
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
|
||||
d, err := cc.Marshal()
|
||||
if err != nil {
|
||||
panic("unexpected marshal error")
|
||||
}
|
||||
// TODO(tbg): this should append the ConfChange for the own node first
|
||||
// and also call applyConfChange below for that node first. Otherwise
|
||||
// we have a Raft group (for a little while) that doesn't have itself
|
||||
// in its config, which is bad.
|
||||
// This whole way of setting things up is rickety. The app should just
|
||||
// populate the initial ConfState appropriately and then all of this
|
||||
// goes away.
|
||||
e := pb.Entry{
|
||||
Type: pb.EntryConfChange,
|
||||
Term: 1,
|
||||
Index: r.raftLog.lastIndex() + 1,
|
||||
Data: d,
|
||||
}
|
||||
r.raftLog.append(e)
|
||||
if len(peers) == 0 {
|
||||
panic("no peers given; use RestartNode instead")
|
||||
}
|
||||
// Mark these initial entries as committed.
|
||||
// TODO(bdarnell): These entries are still unstable; do we need to preserve
|
||||
// the invariant that committed < unstable?
|
||||
r.raftLog.committed = r.raftLog.lastIndex()
|
||||
// Now apply them, mainly so that the application can call Campaign
|
||||
// immediately after StartNode in tests. Note that these nodes will
|
||||
// be added to raft twice: here and when the application's Ready
|
||||
// loop calls ApplyConfChange. The calls to addNode must come after
|
||||
// all calls to raftLog.append so progress.next is set after these
|
||||
// bootstrapping entries (it is an error if we try to append these
|
||||
// entries since they have already been committed).
|
||||
// We do not set raftLog.applied so the application will be able
|
||||
// to observe all conf changes via Ready.CommittedEntries.
|
||||
for _, peer := range peers {
|
||||
r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
|
||||
rn, err := NewRawNode(c)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rn.Bootstrap(peers)
|
||||
|
||||
n := newNode()
|
||||
n.logger = c.Logger
|
||||
go n.run(r)
|
||||
|
||||
go n.run(rn)
|
||||
return &n
|
||||
}
|
||||
|
||||
@ -251,11 +221,13 @@ func StartNode(c *Config, peers []Peer) Node {
|
||||
// If the caller has an existing state machine, pass in the last log index that
|
||||
// has been applied to it; otherwise use zero.
|
||||
func RestartNode(c *Config) Node {
|
||||
r := newRaft(c)
|
||||
|
||||
rn, err := NewRawNode(c)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
n := newNode()
|
||||
n.logger = c.Logger
|
||||
go n.run(r)
|
||||
go n.run(rn)
|
||||
return &n
|
||||
}
|
||||
|
||||
@ -310,30 +282,30 @@ func (n *node) Stop() {
|
||||
<-n.done
|
||||
}
|
||||
|
||||
func (n *node) run(r *raft) {
|
||||
func (n *node) run(rn *RawNode) {
|
||||
var propc chan msgWithResult
|
||||
var readyc chan Ready
|
||||
var advancec chan struct{}
|
||||
var prevLastUnstablei, prevLastUnstablet uint64
|
||||
var havePrevLastUnstablei bool
|
||||
var prevSnapi uint64
|
||||
var applyingToI uint64
|
||||
var rd Ready
|
||||
|
||||
r := rn.raft
|
||||
|
||||
lead := None
|
||||
prevSoftSt := r.softState()
|
||||
prevHardSt := emptyState
|
||||
|
||||
for {
|
||||
if advancec != nil {
|
||||
readyc = nil
|
||||
} else {
|
||||
rd = newReady(r, prevSoftSt, prevHardSt)
|
||||
if rd.containsUpdates() {
|
||||
readyc = n.readyc
|
||||
} else {
|
||||
readyc = nil
|
||||
}
|
||||
} else if rn.HasReady() {
|
||||
// Populate a Ready. Note that this Ready is not guaranteed to
|
||||
// actually be handled. We will arm readyc, but there's no guarantee
|
||||
// that we will actually send on it. It's possible that we will
|
||||
// service another channel instead, loop around, and then populate
|
||||
// the Ready again. We could instead force the previous Ready to be
|
||||
// handled first, but it's generally good to emit larger Readys plus
|
||||
// it simplifies testing (by emitting less frequently and more
|
||||
// predictably).
|
||||
rd = rn.Ready()
|
||||
readyc = n.readyc
|
||||
}
|
||||
|
||||
if lead != r.lead {
|
||||
@ -382,40 +354,13 @@ func (n *node) run(r *raft) {
|
||||
case <-n.done:
|
||||
}
|
||||
case <-n.tickc:
|
||||
r.tick()
|
||||
rn.Tick()
|
||||
case readyc <- rd:
|
||||
if rd.SoftState != nil {
|
||||
prevSoftSt = rd.SoftState
|
||||
}
|
||||
if len(rd.Entries) > 0 {
|
||||
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
|
||||
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
|
||||
havePrevLastUnstablei = true
|
||||
}
|
||||
if !IsEmptyHardState(rd.HardState) {
|
||||
prevHardSt = rd.HardState
|
||||
}
|
||||
if !IsEmptySnap(rd.Snapshot) {
|
||||
prevSnapi = rd.Snapshot.Metadata.Index
|
||||
}
|
||||
if index := rd.appliedCursor(); index != 0 {
|
||||
applyingToI = index
|
||||
}
|
||||
|
||||
r.msgs = nil
|
||||
r.readStates = nil
|
||||
r.reduceUncommittedSize(rd.CommittedEntries)
|
||||
rn.acceptReady(rd)
|
||||
advancec = n.advancec
|
||||
case <-advancec:
|
||||
if applyingToI != 0 {
|
||||
r.raftLog.appliedTo(applyingToI)
|
||||
applyingToI = 0
|
||||
}
|
||||
if havePrevLastUnstablei {
|
||||
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
|
||||
havePrevLastUnstablei = false
|
||||
}
|
||||
r.raftLog.stableSnapTo(prevSnapi)
|
||||
rn.commitReady(rd)
|
||||
rd = Ready{}
|
||||
advancec = nil
|
||||
case c := <-n.status:
|
||||
c <- getStatus(r)
|
||||
|
@ -26,8 +26,8 @@ func BenchmarkOneNode(b *testing.B) {
|
||||
|
||||
n := newNode()
|
||||
s := NewMemoryStorage()
|
||||
r := newTestRaft(1, []uint64{1}, 10, 1, s)
|
||||
go n.run(r)
|
||||
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
|
||||
go n.run(rn)
|
||||
|
||||
defer n.Stop()
|
||||
|
||||
|
@ -132,9 +132,12 @@ func TestNodePropose(t *testing.T) {
|
||||
|
||||
n := newNode()
|
||||
s := NewMemoryStorage()
|
||||
r := newTestRaft(1, []uint64{1}, 10, 1, s)
|
||||
go n.run(r)
|
||||
n.Campaign(context.TODO())
|
||||
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
|
||||
r := rn.raft
|
||||
go n.run(rn)
|
||||
if err := n.Campaign(context.TODO()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for {
|
||||
rd := <-n.Ready()
|
||||
s.Append(rd.Entries)
|
||||
@ -172,10 +175,11 @@ func TestNodeReadIndex(t *testing.T) {
|
||||
|
||||
n := newNode()
|
||||
s := NewMemoryStorage()
|
||||
r := newTestRaft(1, []uint64{1}, 10, 1, s)
|
||||
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
|
||||
r := rn.raft
|
||||
r.readStates = wrs
|
||||
|
||||
go n.run(r)
|
||||
go n.run(rn)
|
||||
n.Campaign(context.TODO())
|
||||
for {
|
||||
rd := <-n.Ready()
|
||||
@ -309,8 +313,9 @@ func TestNodeProposeConfig(t *testing.T) {
|
||||
|
||||
n := newNode()
|
||||
s := NewMemoryStorage()
|
||||
r := newTestRaft(1, []uint64{1}, 10, 1, s)
|
||||
go n.run(r)
|
||||
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
|
||||
r := rn.raft
|
||||
go n.run(rn)
|
||||
n.Campaign(context.TODO())
|
||||
for {
|
||||
rd := <-n.Ready()
|
||||
@ -347,8 +352,8 @@ func TestNodeProposeConfig(t *testing.T) {
|
||||
func TestNodeProposeAddDuplicateNode(t *testing.T) {
|
||||
n := newNode()
|
||||
s := NewMemoryStorage()
|
||||
r := newTestRaft(1, []uint64{1}, 10, 1, s)
|
||||
go n.run(r)
|
||||
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
|
||||
go n.run(rn)
|
||||
n.Campaign(context.TODO())
|
||||
rdyEntries := make([]raftpb.Entry, 0)
|
||||
ticker := time.NewTicker(time.Millisecond * 100)
|
||||
@ -422,8 +427,8 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
|
||||
// who is the current leader.
|
||||
func TestBlockProposal(t *testing.T) {
|
||||
n := newNode()
|
||||
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||
go n.run(r)
|
||||
rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||
go n.run(rn)
|
||||
defer n.Stop()
|
||||
|
||||
errc := make(chan error, 1)
|
||||
@ -463,8 +468,9 @@ func TestNodeProposeWaitDropped(t *testing.T) {
|
||||
|
||||
n := newNode()
|
||||
s := NewMemoryStorage()
|
||||
r := newTestRaft(1, []uint64{1}, 10, 1, s)
|
||||
go n.run(r)
|
||||
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
|
||||
r := rn.raft
|
||||
go n.run(rn)
|
||||
n.Campaign(context.TODO())
|
||||
for {
|
||||
rd := <-n.Ready()
|
||||
@ -497,8 +503,9 @@ func TestNodeProposeWaitDropped(t *testing.T) {
|
||||
func TestNodeTick(t *testing.T) {
|
||||
n := newNode()
|
||||
s := NewMemoryStorage()
|
||||
r := newTestRaft(1, []uint64{1}, 10, 1, s)
|
||||
go n.run(r)
|
||||
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
|
||||
r := rn.raft
|
||||
go n.run(rn)
|
||||
elapsed := r.electionElapsed
|
||||
n.Tick()
|
||||
|
||||
@ -517,11 +524,11 @@ func TestNodeTick(t *testing.T) {
|
||||
func TestNodeStop(t *testing.T) {
|
||||
n := newNode()
|
||||
s := NewMemoryStorage()
|
||||
r := newTestRaft(1, []uint64{1}, 10, 1, s)
|
||||
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
|
||||
donec := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
n.run(r)
|
||||
n.run(rn)
|
||||
close(donec)
|
||||
}()
|
||||
|
||||
@ -618,7 +625,9 @@ func TestNodeStart(t *testing.T) {
|
||||
n.Advance()
|
||||
}
|
||||
|
||||
n.Campaign(ctx)
|
||||
if err := n.Campaign(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rd := <-n.Ready()
|
||||
storage.Append(rd.Entries)
|
||||
n.Advance()
|
||||
@ -646,10 +655,12 @@ func TestNodeRestart(t *testing.T) {
|
||||
st := raftpb.HardState{Term: 1, Commit: 1}
|
||||
|
||||
want := Ready{
|
||||
HardState: st,
|
||||
// No HardState is emitted because there was no change.
|
||||
HardState: raftpb.HardState{},
|
||||
// commit up to index commit index in st
|
||||
CommittedEntries: entries[:st.Commit],
|
||||
MustSync: true,
|
||||
// MustSync is false because no HardState or new entries are provided.
|
||||
MustSync: false,
|
||||
}
|
||||
|
||||
storage := NewMemoryStorage()
|
||||
@ -691,10 +702,14 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
|
||||
st := raftpb.HardState{Term: 1, Commit: 3}
|
||||
|
||||
want := Ready{
|
||||
HardState: st,
|
||||
// No HardState is emitted because nothing changed relative to what is
|
||||
// already persisted.
|
||||
HardState: raftpb.HardState{},
|
||||
// commit up to index commit index in st
|
||||
CommittedEntries: entries,
|
||||
MustSync: true,
|
||||
// MustSync is only true when there is a new HardState or new entries;
|
||||
// neither is the case here.
|
||||
MustSync: false,
|
||||
}
|
||||
|
||||
s := NewMemoryStorage()
|
||||
@ -800,8 +815,8 @@ func TestNodeProposeAddLearnerNode(t *testing.T) {
|
||||
defer ticker.Stop()
|
||||
n := newNode()
|
||||
s := NewMemoryStorage()
|
||||
r := newTestRaft(1, []uint64{1}, 10, 1, s)
|
||||
go n.run(r)
|
||||
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
|
||||
go n.run(rn)
|
||||
n.Campaign(context.TODO())
|
||||
stop := make(chan struct{})
|
||||
done := make(chan struct{})
|
||||
@ -895,9 +910,12 @@ func TestCommitPagination(t *testing.T) {
|
||||
s := NewMemoryStorage()
|
||||
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
|
||||
cfg.MaxCommittedSizePerReady = 2048
|
||||
r := newRaft(cfg)
|
||||
rn, err := NewRawNode(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
n := newNode()
|
||||
go n.run(r)
|
||||
go n.run(rn)
|
||||
n.Campaign(context.TODO())
|
||||
|
||||
rd := readyWithTimeout(&n)
|
||||
@ -984,9 +1002,12 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) {
|
||||
// this and *will* return it (which is how the Commit index ended up being 10 initially).
|
||||
cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
|
||||
|
||||
r := newRaft(cfg)
|
||||
rn, err := NewRawNode(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
n := newNode()
|
||||
go n.run(r)
|
||||
go n.run(rn)
|
||||
defer n.Stop()
|
||||
|
||||
rd := readyWithTimeout(&n)
|
||||
@ -997,57 +1018,3 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// TestNodeBoundedLogGrowthWithPartition tests a scenario where a leader is
|
||||
// partitioned from a quorum of nodes. It verifies that the leader's log is
|
||||
// protected from unbounded growth even as new entries continue to be proposed.
|
||||
// This protection is provided by the MaxUncommittedEntriesSize configuration.
|
||||
func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
|
||||
const maxEntries = 16
|
||||
data := []byte("testdata")
|
||||
testEntry := raftpb.Entry{Data: data}
|
||||
maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))
|
||||
|
||||
s := NewMemoryStorage()
|
||||
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
|
||||
cfg.MaxUncommittedEntriesSize = maxEntrySize
|
||||
r := newRaft(cfg)
|
||||
n := newNode()
|
||||
go n.run(r)
|
||||
defer n.Stop()
|
||||
n.Campaign(context.TODO())
|
||||
|
||||
rd := readyWithTimeout(&n)
|
||||
if len(rd.CommittedEntries) != 1 {
|
||||
t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
|
||||
}
|
||||
s.Append(rd.Entries)
|
||||
n.Advance()
|
||||
|
||||
// Simulate a network partition while we make our proposals by never
|
||||
// committing anything. These proposals should not cause the leader's
|
||||
// log to grow indefinitely.
|
||||
for i := 0; i < 1024; i++ {
|
||||
n.Propose(context.TODO(), data)
|
||||
}
|
||||
|
||||
// Check the size of leader's uncommitted log tail. It should not exceed the
|
||||
// MaxUncommittedEntriesSize limit.
|
||||
checkUncommitted := func(exp uint64) {
|
||||
t.Helper()
|
||||
if a := r.uncommittedSize; exp != a {
|
||||
t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a)
|
||||
}
|
||||
}
|
||||
checkUncommitted(maxEntrySize)
|
||||
|
||||
// Recover from the partition. The uncommitted tail of the Raft log should
|
||||
// disappear as entries are committed.
|
||||
rd = readyWithTimeout(&n)
|
||||
if len(rd.CommittedEntries) != maxEntries {
|
||||
t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries))
|
||||
}
|
||||
s.Append(rd.Entries)
|
||||
n.Advance()
|
||||
checkUncommitted(0)
|
||||
}
|
||||
|
@ -4310,3 +4310,14 @@ func newTestLearnerRaft(id uint64, peers []uint64, learners []uint64, election,
|
||||
cfg.learners = learners
|
||||
return newRaft(cfg)
|
||||
}
|
||||
|
||||
// newTestRawNode sets up a RawNode with the given peers. The configuration will
|
||||
// not be reflected in the Storage.
|
||||
func newTestRawNode(id uint64, peers []uint64, election, heartbeat int, storage Storage) *RawNode {
|
||||
cfg := newTestConfig(id, peers, election, heartbeat, storage)
|
||||
rn, err := NewRawNode(cfg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return rn
|
||||
}
|
||||
|
135
raft/rawnode.go
135
raft/rawnode.go
@ -37,40 +37,14 @@ type RawNode struct {
|
||||
prevHardSt pb.HardState
|
||||
}
|
||||
|
||||
func (rn *RawNode) newReady() Ready {
|
||||
return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
|
||||
}
|
||||
|
||||
func (rn *RawNode) commitReady(rd Ready) {
|
||||
if rd.SoftState != nil {
|
||||
rn.prevSoftSt = rd.SoftState
|
||||
}
|
||||
if !IsEmptyHardState(rd.HardState) {
|
||||
rn.prevHardSt = rd.HardState
|
||||
}
|
||||
|
||||
// If entries were applied (or a snapshot), update our cursor for
|
||||
// the next Ready. Note that if the current HardState contains a
|
||||
// new Commit index, this does not mean that we're also applying
|
||||
// all of the new entries due to commit pagination by size.
|
||||
if index := rd.appliedCursor(); index > 0 {
|
||||
rn.raft.raftLog.appliedTo(index)
|
||||
}
|
||||
|
||||
if len(rd.Entries) > 0 {
|
||||
e := rd.Entries[len(rd.Entries)-1]
|
||||
rn.raft.raftLog.stableTo(e.Index, e.Term)
|
||||
}
|
||||
if !IsEmptySnap(rd.Snapshot) {
|
||||
rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
|
||||
}
|
||||
if len(rd.ReadStates) != 0 {
|
||||
rn.raft.readStates = nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewRawNode returns a new RawNode given configuration and a list of raft peers.
|
||||
func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
|
||||
// NewRawNode instantiates a RawNode from the given configuration.
|
||||
//
|
||||
// See Bootstrap() for bootstrapping an initial state; this replaces the former
|
||||
// 'peers' argument to this method (with identical behavior). However, It is
|
||||
// recommended that instead of calling Bootstrap, applications bootstrap their
|
||||
// state manually by setting up a Storage that has a first index > 1 and which
|
||||
// stores the desired ConfState as its InitialState.
|
||||
func NewRawNode(config *Config) (*RawNode, error) {
|
||||
if config.ID == 0 {
|
||||
panic("config.ID must not be zero")
|
||||
}
|
||||
@ -78,41 +52,8 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
|
||||
rn := &RawNode{
|
||||
raft: r,
|
||||
}
|
||||
lastIndex, err := config.Storage.LastIndex()
|
||||
if err != nil {
|
||||
panic(err) // TODO(bdarnell)
|
||||
}
|
||||
// If the log is empty, this is a new RawNode (like StartNode); otherwise it's
|
||||
// restoring an existing RawNode (like RestartNode).
|
||||
// TODO(bdarnell): rethink RawNode initialization and whether the application needs
|
||||
// to be able to tell us when it expects the RawNode to exist.
|
||||
if lastIndex == 0 {
|
||||
r.becomeFollower(1, None)
|
||||
ents := make([]pb.Entry, len(peers))
|
||||
for i, peer := range peers {
|
||||
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
|
||||
data, err := cc.Marshal()
|
||||
if err != nil {
|
||||
panic("unexpected marshal error")
|
||||
}
|
||||
|
||||
ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
|
||||
}
|
||||
r.raftLog.append(ents...)
|
||||
r.raftLog.committed = uint64(len(ents))
|
||||
for _, peer := range peers {
|
||||
r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
|
||||
}
|
||||
}
|
||||
|
||||
// Set the initial hard and soft states after performing all initialization.
|
||||
rn.prevSoftSt = r.softState()
|
||||
if lastIndex == 0 {
|
||||
rn.prevHardSt = emptyState
|
||||
} else {
|
||||
rn.prevHardSt = r.hardState()
|
||||
}
|
||||
|
||||
rn.prevHardSt = r.hardState()
|
||||
return rn, nil
|
||||
}
|
||||
|
||||
@ -182,14 +123,61 @@ func (rn *RawNode) Step(m pb.Message) error {
|
||||
return ErrStepPeerNotFound
|
||||
}
|
||||
|
||||
// Ready returns the current point-in-time state of this RawNode.
|
||||
// Ready returns the outstanding work that the application needs to handle. This
|
||||
// includes appending and applying entries or a snapshot, updating the HardState,
|
||||
// and sending messages. Ready() is a read-only operation, that is, it does not
|
||||
// require the caller to actually handle the result. Typically, a caller will
|
||||
// want to handle the Ready and must pass the Ready to Advance *after* having
|
||||
// done so. While a Ready is being handled, the RawNode must not be used for
|
||||
// operations that may alter its state. For example, it is illegal to call
|
||||
// Ready, followed by Step, followed by Advance.
|
||||
func (rn *RawNode) Ready() Ready {
|
||||
rd := rn.newReady()
|
||||
rn.raft.msgs = nil
|
||||
rn.raft.reduceUncommittedSize(rd.CommittedEntries)
|
||||
return rd
|
||||
}
|
||||
|
||||
func (rn *RawNode) newReady() Ready {
|
||||
return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
|
||||
}
|
||||
|
||||
// acceptReady is called when the consumer of the RawNode has decided to go
|
||||
// ahead and handle a Ready. Nothing must alter the state of the RawNode between
|
||||
// this call and the prior call to Ready().
|
||||
func (rn *RawNode) acceptReady(rd Ready) {
|
||||
if rd.SoftState != nil {
|
||||
rn.prevSoftSt = rd.SoftState
|
||||
}
|
||||
if len(rd.ReadStates) != 0 {
|
||||
rn.raft.readStates = nil
|
||||
}
|
||||
rn.raft.msgs = nil
|
||||
}
|
||||
|
||||
// commitReady is called when the consumer of the RawNode has successfully
|
||||
// handled a Ready (having previously called acceptReady).
|
||||
func (rn *RawNode) commitReady(rd Ready) {
|
||||
if !IsEmptyHardState(rd.HardState) {
|
||||
rn.prevHardSt = rd.HardState
|
||||
}
|
||||
|
||||
// If entries were applied (or a snapshot), update our cursor for
|
||||
// the next Ready. Note that if the current HardState contains a
|
||||
// new Commit index, this does not mean that we're also applying
|
||||
// all of the new entries due to commit pagination by size.
|
||||
if index := rd.appliedCursor(); index > 0 {
|
||||
rn.raft.raftLog.appliedTo(index)
|
||||
}
|
||||
rn.raft.reduceUncommittedSize(rd.CommittedEntries)
|
||||
|
||||
if len(rd.Entries) > 0 {
|
||||
e := rd.Entries[len(rd.Entries)-1]
|
||||
rn.raft.raftLog.stableTo(e.Index, e.Term)
|
||||
}
|
||||
if !IsEmptySnap(rd.Snapshot) {
|
||||
rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
|
||||
}
|
||||
}
|
||||
|
||||
// HasReady called when RawNode user need to check if any Ready pending.
|
||||
// Checking logic in this method should be consistent with Ready.containsUpdates().
|
||||
func (rn *RawNode) HasReady() bool {
|
||||
@ -215,6 +203,11 @@ func (rn *RawNode) HasReady() bool {
|
||||
// Advance notifies the RawNode that the application has applied and saved progress in the
|
||||
// last Ready results.
|
||||
func (rn *RawNode) Advance(rd Ready) {
|
||||
// Advance combines accept and commit. Callers can't mutate the RawNode
|
||||
// between the call to Ready and the matching call to Advance, or the work
|
||||
// done in acceptReady will clobber potentially newer data that has not been
|
||||
// emitted in a Ready yet.
|
||||
rn.acceptReady(rd)
|
||||
rn.commitReady(rd)
|
||||
}
|
||||
|
||||
|
@ -18,11 +18,12 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/raft/quorum"
|
||||
"go.etcd.io/etcd/raft/raftpb"
|
||||
pb "go.etcd.io/etcd/raft/raftpb"
|
||||
"go.etcd.io/etcd/raft/tracker"
|
||||
)
|
||||
|
||||
@ -61,28 +62,43 @@ func (a *rawNodeAdapter) ReadIndex(_ context.Context, rctx []byte) error {
|
||||
// RawNode swallowed the error in ReadIndex, it probably should not do that.
|
||||
return nil
|
||||
}
|
||||
func (a *rawNodeAdapter) Step(_ context.Context, m raftpb.Message) error { return a.RawNode.Step(m) }
|
||||
func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) }
|
||||
func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc raftpb.ConfChange) error {
|
||||
func (a *rawNodeAdapter) Step(_ context.Context, m pb.Message) error { return a.RawNode.Step(m) }
|
||||
func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) }
|
||||
func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc pb.ConfChange) error {
|
||||
return a.RawNode.ProposeConfChange(cc)
|
||||
}
|
||||
|
||||
// TestRawNodeStep ensures that RawNode.Step ignore local message.
|
||||
func TestRawNodeStep(t *testing.T) {
|
||||
for i, msgn := range raftpb.MessageType_name {
|
||||
s := NewMemoryStorage()
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
msgt := raftpb.MessageType(i)
|
||||
err = rawNode.Step(raftpb.Message{Type: msgt})
|
||||
// LocalMsg should be ignored.
|
||||
if IsLocalMsg(msgt) {
|
||||
if err != ErrStepLocalMsg {
|
||||
t.Errorf("%d: step should ignore %s", msgt, msgn)
|
||||
for i, msgn := range pb.MessageType_name {
|
||||
t.Run(msgn, func(t *testing.T) {
|
||||
s := NewMemoryStorage()
|
||||
s.SetHardState(pb.HardState{Term: 1, Commit: 1})
|
||||
s.Append([]pb.Entry{{Term: 1, Index: 1}})
|
||||
if err := s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{
|
||||
ConfState: pb.ConfState{
|
||||
Nodes: []uint64{1},
|
||||
},
|
||||
Index: 1,
|
||||
Term: 1,
|
||||
}}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
// Append an empty entry to make sure the non-local messages (like
|
||||
// vote requests) are ignored and don't trigger assertions.
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
msgt := pb.MessageType(i)
|
||||
err = rawNode.Step(pb.Message{Type: msgt})
|
||||
// LocalMsg should be ignored.
|
||||
if IsLocalMsg(msgt) {
|
||||
if err != ErrStepLocalMsg {
|
||||
t.Errorf("%d: step should ignore %s", msgt, msgn)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -94,17 +110,10 @@ func TestRawNodeStep(t *testing.T) {
|
||||
func TestRawNodeProposeAndConfChange(t *testing.T) {
|
||||
s := NewMemoryStorage()
|
||||
var err error
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
|
||||
rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rd := rawNode.Ready()
|
||||
s.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
|
||||
if d := rawNode.Ready(); d.MustSync || !IsEmptyHardState(d.HardState) || len(d.Entries) > 0 {
|
||||
t.Fatalf("expected empty hard state with must-sync=false: %#v", d)
|
||||
}
|
||||
|
||||
rawNode.Campaign()
|
||||
proposed := false
|
||||
@ -113,13 +122,15 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
|
||||
ccdata []byte
|
||||
)
|
||||
for {
|
||||
rd = rawNode.Ready()
|
||||
rd := rawNode.Ready()
|
||||
s.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
// Once we are the leader, propose a command and a ConfChange.
|
||||
if !proposed && rd.SoftState.Lead == rawNode.raft.id {
|
||||
rawNode.Propose([]byte("somedata"))
|
||||
|
||||
cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
|
||||
if err = rawNode.Propose([]byte("somedata")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1}
|
||||
ccdata, err = cc.Marshal()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -127,16 +138,13 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
|
||||
rawNode.ProposeConfChange(cc)
|
||||
|
||||
proposed = true
|
||||
}
|
||||
rawNode.Advance(rd)
|
||||
|
||||
// Exit when we have four entries: one ConfChange, one no-op for the election,
|
||||
// our proposed command and proposed ConfChange.
|
||||
lastIndex, err = s.LastIndex()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if lastIndex >= 4 {
|
||||
} else if proposed {
|
||||
// We proposed last cycle, which means we appended the conf change
|
||||
// in this cycle.
|
||||
lastIndex, err = s.LastIndex()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -151,8 +159,8 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
|
||||
if !bytes.Equal(entries[0].Data, []byte("somedata")) {
|
||||
t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
|
||||
}
|
||||
if entries[1].Type != raftpb.EntryConfChange {
|
||||
t.Fatalf("type = %v, want %v", entries[1].Type, raftpb.EntryConfChange)
|
||||
if entries[1].Type != pb.EntryConfChange {
|
||||
t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChange)
|
||||
}
|
||||
if !bytes.Equal(entries[1].Data, ccdata) {
|
||||
t.Errorf("data = %v, want %v", entries[1].Data, ccdata)
|
||||
@ -163,7 +171,7 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
|
||||
// not affect the later propose to add new node.
|
||||
func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
|
||||
s := NewMemoryStorage()
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
|
||||
rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -182,13 +190,13 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
|
||||
rawNode.Advance(rd)
|
||||
}
|
||||
|
||||
proposeConfChangeAndApply := func(cc raftpb.ConfChange) {
|
||||
proposeConfChangeAndApply := func(cc pb.ConfChange) {
|
||||
rawNode.ProposeConfChange(cc)
|
||||
rd = rawNode.Ready()
|
||||
s.Append(rd.Entries)
|
||||
for _, entry := range rd.CommittedEntries {
|
||||
if entry.Type == raftpb.EntryConfChange {
|
||||
var cc raftpb.ConfChange
|
||||
if entry.Type == pb.EntryConfChange {
|
||||
var cc pb.ConfChange
|
||||
cc.Unmarshal(entry.Data)
|
||||
rawNode.ApplyConfChange(cc)
|
||||
}
|
||||
@ -196,7 +204,7 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
|
||||
rawNode.Advance(rd)
|
||||
}
|
||||
|
||||
cc1 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
|
||||
cc1 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1}
|
||||
ccdata1, err := cc1.Marshal()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -207,7 +215,7 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
|
||||
proposeConfChangeAndApply(cc1)
|
||||
|
||||
// the new node join should be ok
|
||||
cc2 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
|
||||
cc2 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 2}
|
||||
ccdata2, err := cc2.Marshal()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -238,16 +246,16 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
|
||||
// TestRawNodeReadIndex ensures that Rawnode.ReadIndex sends the MsgReadIndex message
|
||||
// to the underlying raft. It also ensures that ReadState can be read out.
|
||||
func TestRawNodeReadIndex(t *testing.T) {
|
||||
msgs := []raftpb.Message{}
|
||||
appendStep := func(r *raft, m raftpb.Message) error {
|
||||
msgs := []pb.Message{}
|
||||
appendStep := func(r *raft, m pb.Message) error {
|
||||
msgs = append(msgs, m)
|
||||
return nil
|
||||
}
|
||||
wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
|
||||
|
||||
s := NewMemoryStorage()
|
||||
c := newTestConfig(1, nil, 10, 1, s)
|
||||
rawNode, err := NewRawNode(c, []Peer{{ID: 1}})
|
||||
c := newTestConfig(1, []uint64{1}, 10, 1, s)
|
||||
rawNode, err := NewRawNode(c)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -288,8 +296,8 @@ func TestRawNodeReadIndex(t *testing.T) {
|
||||
if len(msgs) != 1 {
|
||||
t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
|
||||
}
|
||||
if msgs[0].Type != raftpb.MsgReadIndex {
|
||||
t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex)
|
||||
if msgs[0].Type != pb.MsgReadIndex {
|
||||
t.Errorf("msg type = %d, want %d", msgs[0].Type, pb.MsgReadIndex)
|
||||
}
|
||||
if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
|
||||
t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
|
||||
@ -305,61 +313,108 @@ func TestRawNodeReadIndex(t *testing.T) {
|
||||
// TestNodeStop from node_test.go has no equivalent in rawNode because there is
|
||||
// no goroutine in RawNode.
|
||||
|
||||
// TestRawNodeStart ensures that a node can be started correctly. The node should
|
||||
// start with correct configuration change entries, and can accept and commit
|
||||
// proposals.
|
||||
// TestRawNodeStart ensures that a node can be started correctly. Note that RawNode
|
||||
// requires the application to bootstrap the state, i.e. it does not accept peers
|
||||
// and will not create faux configuration change entries.
|
||||
func TestRawNodeStart(t *testing.T) {
|
||||
cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
|
||||
ccdata, err := cc.Marshal()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected marshal error: %v", err)
|
||||
}
|
||||
wants := []Ready{
|
||||
{
|
||||
HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0},
|
||||
Entries: []raftpb.Entry{
|
||||
{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
|
||||
},
|
||||
CommittedEntries: []raftpb.Entry{
|
||||
{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
|
||||
},
|
||||
MustSync: true,
|
||||
want := Ready{
|
||||
SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
|
||||
HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1},
|
||||
Entries: []pb.Entry{
|
||||
{Term: 1, Index: 2, Data: nil}, // empty entry
|
||||
{Term: 1, Index: 3, Data: []byte("foo")}, // empty entry
|
||||
},
|
||||
{
|
||||
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
|
||||
Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
|
||||
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
|
||||
MustSync: true,
|
||||
CommittedEntries: []pb.Entry{
|
||||
{Term: 1, Index: 2, Data: nil}, // empty entry
|
||||
{Term: 1, Index: 3, Data: []byte("foo")}, // empty entry
|
||||
},
|
||||
MustSync: true,
|
||||
}
|
||||
|
||||
storage := NewMemoryStorage()
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
|
||||
storage.ents[0].Index = 1
|
||||
|
||||
// TODO(tbg): this is a first prototype of what bootstrapping could look
|
||||
// like (without the annoying faux ConfChanges). We want to persist a
|
||||
// ConfState at some index and make sure that this index can't be reached
|
||||
// from log position 1, so that followers are forced to pick up the
|
||||
// ConfState in order to move away from log position 1 (unless they got
|
||||
// bootstrapped in the same way already). Failing to do so would mean that
|
||||
// followers diverge from the bootstrapped nodes and don't learn about the
|
||||
// initial config.
|
||||
//
|
||||
// NB: this is exactly what CockroachDB does. The Raft log really begins at
|
||||
// index 10, so empty followers (at index 1) always need a snapshot first.
|
||||
type appenderStorage interface {
|
||||
Storage
|
||||
ApplySnapshot(pb.Snapshot) error
|
||||
}
|
||||
bootstrap := func(storage appenderStorage, cs pb.ConfState) error {
|
||||
if len(cs.Nodes) == 0 {
|
||||
return fmt.Errorf("no voters specified")
|
||||
}
|
||||
fi, err := storage.FirstIndex()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if fi < 2 {
|
||||
return fmt.Errorf("FirstIndex >= 2 is prerequisite for bootstrap")
|
||||
}
|
||||
if _, err = storage.Entries(fi, fi, math.MaxUint64); err == nil {
|
||||
// TODO(tbg): match exact error
|
||||
return fmt.Errorf("should not have been able to load first index")
|
||||
}
|
||||
li, err := storage.LastIndex()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = storage.Entries(li, li, math.MaxUint64); err == nil {
|
||||
return fmt.Errorf("should not have been able to load last index")
|
||||
}
|
||||
hs, ics, err := storage.InitialState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !IsEmptyHardState(hs) {
|
||||
return fmt.Errorf("HardState not empty")
|
||||
}
|
||||
if len(ics.Nodes) != 0 {
|
||||
return fmt.Errorf("ConfState not empty")
|
||||
}
|
||||
|
||||
meta := pb.SnapshotMetadata{
|
||||
Index: 1,
|
||||
Term: 0,
|
||||
ConfState: cs,
|
||||
}
|
||||
snap := pb.Snapshot{Metadata: meta}
|
||||
return storage.ApplySnapshot(snap)
|
||||
}
|
||||
|
||||
if err := bootstrap(storage, pb.ConfState{Nodes: []uint64{1}}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rd := rawNode.Ready()
|
||||
t.Logf("rd %v", rd)
|
||||
if !reflect.DeepEqual(rd, wants[0]) {
|
||||
t.Fatalf("#%d: g = %+v,\n w %+v", 1, rd, wants[0])
|
||||
} else {
|
||||
storage.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
if rawNode.HasReady() {
|
||||
t.Fatalf("unexpected ready: %+v", rawNode.Ready())
|
||||
}
|
||||
storage.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
|
||||
rawNode.Campaign()
|
||||
rd = rawNode.Ready()
|
||||
rawNode.Propose([]byte("foo"))
|
||||
if !rawNode.HasReady() {
|
||||
t.Fatal("expected a Ready")
|
||||
}
|
||||
rd := rawNode.Ready()
|
||||
storage.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
|
||||
rawNode.Propose([]byte("foo"))
|
||||
if rd = rawNode.Ready(); !reflect.DeepEqual(rd, wants[1]) {
|
||||
t.Errorf("#%d: g = %+v,\n w %+v", 2, rd, wants[1])
|
||||
} else {
|
||||
storage.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
rd.SoftState, want.SoftState = nil, nil
|
||||
|
||||
if !reflect.DeepEqual(rd, want) {
|
||||
t.Fatalf("unexpected Ready:\n%+v\nvs\n%+v", rd, want)
|
||||
}
|
||||
|
||||
if rawNode.HasReady() {
|
||||
@ -368,11 +423,11 @@ func TestRawNodeStart(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRawNodeRestart(t *testing.T) {
|
||||
entries := []raftpb.Entry{
|
||||
entries := []pb.Entry{
|
||||
{Term: 1, Index: 1},
|
||||
{Term: 1, Index: 2, Data: []byte("foo")},
|
||||
}
|
||||
st := raftpb.HardState{Term: 1, Commit: 1}
|
||||
st := pb.HardState{Term: 1, Commit: 1}
|
||||
|
||||
want := Ready{
|
||||
HardState: emptyState,
|
||||
@ -384,7 +439,7 @@ func TestRawNodeRestart(t *testing.T) {
|
||||
storage := NewMemoryStorage()
|
||||
storage.SetHardState(st)
|
||||
storage.Append(entries)
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), nil)
|
||||
rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, storage))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -399,17 +454,17 @@ func TestRawNodeRestart(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRawNodeRestartFromSnapshot(t *testing.T) {
|
||||
snap := raftpb.Snapshot{
|
||||
Metadata: raftpb.SnapshotMetadata{
|
||||
ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}},
|
||||
snap := pb.Snapshot{
|
||||
Metadata: pb.SnapshotMetadata{
|
||||
ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
|
||||
Index: 2,
|
||||
Term: 1,
|
||||
},
|
||||
}
|
||||
entries := []raftpb.Entry{
|
||||
entries := []pb.Entry{
|
||||
{Term: 1, Index: 3, Data: []byte("foo")},
|
||||
}
|
||||
st := raftpb.HardState{Term: 1, Commit: 3}
|
||||
st := pb.HardState{Term: 1, Commit: 3}
|
||||
|
||||
want := Ready{
|
||||
HardState: emptyState,
|
||||
@ -422,7 +477,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) {
|
||||
s.SetHardState(st)
|
||||
s.ApplySnapshot(snap)
|
||||
s.Append(entries)
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), nil)
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -441,7 +496,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) {
|
||||
|
||||
func TestRawNodeStatus(t *testing.T) {
|
||||
s := NewMemoryStorage()
|
||||
rn, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s), nil)
|
||||
rn, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -489,20 +544,20 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
|
||||
s := &ignoreSizeHintMemStorage{
|
||||
MemoryStorage: NewMemoryStorage(),
|
||||
}
|
||||
persistedHardState := raftpb.HardState{
|
||||
persistedHardState := pb.HardState{
|
||||
Term: 1,
|
||||
Vote: 1,
|
||||
Commit: 10,
|
||||
}
|
||||
|
||||
s.hardState = persistedHardState
|
||||
s.ents = make([]raftpb.Entry, 10)
|
||||
s.ents = make([]pb.Entry, 10)
|
||||
var size uint64
|
||||
for i := range s.ents {
|
||||
ent := raftpb.Entry{
|
||||
ent := pb.Entry{
|
||||
Term: 1,
|
||||
Index: uint64(i + 1),
|
||||
Type: raftpb.EntryNormal,
|
||||
Type: pb.EntryNormal,
|
||||
Data: []byte("a"),
|
||||
}
|
||||
|
||||
@ -516,14 +571,14 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
|
||||
// this and *will* return it (which is how the Commit index ended up being 10 initially).
|
||||
cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
|
||||
|
||||
s.ents = append(s.ents, raftpb.Entry{
|
||||
s.ents = append(s.ents, pb.Entry{
|
||||
Term: 1,
|
||||
Index: uint64(11),
|
||||
Type: raftpb.EntryNormal,
|
||||
Type: pb.EntryNormal,
|
||||
Data: []byte("boom"),
|
||||
})
|
||||
|
||||
rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}})
|
||||
rawNode, err := NewRawNode(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -539,8 +594,8 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
|
||||
}
|
||||
highestApplied = rd.CommittedEntries[n-1].Index
|
||||
rawNode.Advance(rd)
|
||||
rawNode.Step(raftpb.Message{
|
||||
Type: raftpb.MsgHeartbeat,
|
||||
rawNode.Step(pb.Message{
|
||||
Type: pb.MsgHeartbeat,
|
||||
To: 1,
|
||||
From: 1, // illegal, but we get away with it
|
||||
Term: 1,
|
||||
@ -556,13 +611,13 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
|
||||
func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
|
||||
const maxEntries = 16
|
||||
data := []byte("testdata")
|
||||
testEntry := raftpb.Entry{Data: data}
|
||||
testEntry := pb.Entry{Data: data}
|
||||
maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))
|
||||
|
||||
s := NewMemoryStorage()
|
||||
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
|
||||
cfg.MaxUncommittedEntriesSize = maxEntrySize
|
||||
rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}})
|
||||
rawNode, err := NewRawNode(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user