mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: clean up bootstrap
This is the first (maybe not last) step in cleaning up the bootstrap code around StartNode. Initializing a Raft group for the first time is awkward, since a configuration has to be pulled from thin air. The way this is solved today is unclean: The app is supposed to pass peers to StartNode(), we add configuration changes for them to the log, immediately pretend that they are applied, but actually leave them unapplied (to give the app a chance to observe them, though if the app did decide to not apply them things would really go off the rails), and then return control to the app. The app will then process the initial Readys and as a result the configuration will be persisted to disk; restarts of the node then use RestartNode which doesn't take any peers. The code that did this lived awkwardly in two places fairly deep down the callstack, though it was really only necessary in StartNode(). This commit refactors things to make this more obvious: only StartNode does this dance now. In particular, RawNode does not support this at all any more; it expects the app to set up its Storage correctly. Future work may provide helpers to make this "preseeding" of the Storage more user-friendly. It isn't entirely straightforward to do so since the Storage interface doesn't provide the right accessors for this purpose. Briefly speaking, we want to make sure that a non-bootstrapped node can never catch up via the log so that we can implicitly use one of the "skipped" log entries to represent the configuration change into the bootstrap configuration. This is an invasive change that affects all consumers of raft, and it is of lower urgency since the code (post this commit) already encapsulates the complexity sufficiently.
This commit is contained in:
parent
c62b7048b5
commit
c9491d7861
@ -479,7 +479,11 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
|
||||
}
|
||||
}
|
||||
|
||||
n = raft.StartNode(c, peers)
|
||||
if len(peers) == 0 {
|
||||
n = raft.RestartNode(c)
|
||||
} else {
|
||||
n = raft.StartNode(c, peers)
|
||||
}
|
||||
raftStatusMu.Lock()
|
||||
raftStatus = n.Status
|
||||
raftStatusMu.Unlock()
|
||||
|
62
raft/node.go
62
raft/node.go
@ -197,12 +197,63 @@ type Peer struct {
|
||||
|
||||
// StartNode returns a new Node given configuration and a list of raft peers.
|
||||
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
|
||||
//
|
||||
// Peers must not be zero length; call RestartNode in that case.
|
||||
func StartNode(c *Config, peers []Peer) Node {
|
||||
rn, err := NewRawNode(c, peers)
|
||||
if len(peers) == 0 {
|
||||
panic("no peers given; use RestartNode instead")
|
||||
}
|
||||
rn, err := NewRawNode(c)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
lastIndex, err := rn.raft.raftLog.storage.LastIndex()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if lastIndex != 0 {
|
||||
panic("can't StartNode on a nonempty Storage")
|
||||
}
|
||||
|
||||
// We've faked out initial entries above, but nothing has been
|
||||
// persisted. Start with an empty HardState (thus the first Ready will
|
||||
// emit a HardState update for the app to persist).
|
||||
rn.prevHardSt = emptyState
|
||||
|
||||
// TODO(tbg): remove StartNode and give the application the right tools to
|
||||
// bootstrap the initial membership in a cleaner way.
|
||||
rn.raft.becomeFollower(1, None)
|
||||
ents := make([]pb.Entry, len(peers))
|
||||
for i, peer := range peers {
|
||||
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
|
||||
data, err := cc.Marshal()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
|
||||
}
|
||||
rn.raft.raftLog.append(ents...)
|
||||
|
||||
// Now apply them, mainly so that the application can call Campaign
|
||||
// immediately after StartNode in tests. Note that these nodes will
|
||||
// be added to raft twice: here and when the application's Ready
|
||||
// loop calls ApplyConfChange. The calls to addNode must come after
|
||||
// all calls to raftLog.append so progress.next is set after these
|
||||
// bootstrapping entries (it is an error if we try to append these
|
||||
// entries since they have already been committed).
|
||||
// We do not set raftLog.applied so the application will be able
|
||||
// to observe all conf changes via Ready.CommittedEntries.
|
||||
//
|
||||
// TODO(bdarnell): These entries are still unstable; do we need to preserve
|
||||
// the invariant that committed < unstable?
|
||||
rn.raft.raftLog.committed = uint64(len(ents))
|
||||
for _, peer := range peers {
|
||||
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
|
||||
}
|
||||
|
||||
n := newNode()
|
||||
n.logger = c.Logger
|
||||
|
||||
@ -215,7 +266,14 @@ func StartNode(c *Config, peers []Peer) Node {
|
||||
// If the caller has an existing state machine, pass in the last log index that
|
||||
// has been applied to it; otherwise use zero.
|
||||
func RestartNode(c *Config) Node {
|
||||
return StartNode(c, nil)
|
||||
rn, err := NewRawNode(c)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
n := newNode()
|
||||
n.logger = c.Logger
|
||||
go n.run(rn)
|
||||
return &n
|
||||
}
|
||||
|
||||
type msgWithResult struct {
|
||||
|
@ -910,7 +910,7 @@ func TestCommitPagination(t *testing.T) {
|
||||
s := NewMemoryStorage()
|
||||
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
|
||||
cfg.MaxCommittedSizePerReady = 2048
|
||||
rn, err := NewRawNode(cfg, nil)
|
||||
rn, err := NewRawNode(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1002,7 +1002,7 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) {
|
||||
// this and *will* return it (which is how the Commit index ended up being 10 initially).
|
||||
cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
|
||||
|
||||
rn, err := NewRawNode(cfg, nil)
|
||||
rn, err := NewRawNode(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1032,7 +1032,7 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
|
||||
s := NewMemoryStorage()
|
||||
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
|
||||
cfg.MaxUncommittedEntriesSize = maxEntrySize
|
||||
rn, err := NewRawNode(cfg, nil)
|
||||
rn, err := NewRawNode(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -4311,9 +4311,11 @@ func newTestLearnerRaft(id uint64, peers []uint64, learners []uint64, election,
|
||||
return newRaft(cfg)
|
||||
}
|
||||
|
||||
// newTestRawNode sets up a RawNode with the given peers. The configuration will
|
||||
// not be reflected in the Storage.
|
||||
func newTestRawNode(id uint64, peers []uint64, election, heartbeat int, storage Storage) *RawNode {
|
||||
cfg := newTestConfig(id, peers, election, heartbeat, storage)
|
||||
rn, err := NewRawNode(cfg, nil)
|
||||
rn, err := NewRawNode(cfg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -37,8 +37,8 @@ type RawNode struct {
|
||||
prevHardSt pb.HardState
|
||||
}
|
||||
|
||||
// NewRawNode returns a new RawNode given configuration and a list of raft peers.
|
||||
func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
|
||||
// NewRawNode instantiates a RawNode from the given configuration.
|
||||
func NewRawNode(config *Config) (*RawNode, error) {
|
||||
if config.ID == 0 {
|
||||
panic("config.ID must not be zero")
|
||||
}
|
||||
@ -46,63 +46,9 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
|
||||
rn := &RawNode{
|
||||
raft: r,
|
||||
}
|
||||
if err := rn.init(peers); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rn, nil
|
||||
}
|
||||
|
||||
func (rn *RawNode) init(peers []Peer) error {
|
||||
r := rn.raft
|
||||
lastIndex, err := rn.raft.raftLog.storage.LastIndex()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// If the log is empty, this is a new RawNode (like StartNode); otherwise it's
|
||||
// restoring an existing RawNode (like RestartNode).
|
||||
// TODO(bdarnell): rethink RawNode initialization and whether the application needs
|
||||
// to be able to tell us when it expects the RawNode to exist.
|
||||
if lastIndex == 0 {
|
||||
rn.raft.becomeFollower(1, None)
|
||||
ents := make([]pb.Entry, len(peers))
|
||||
for i, peer := range peers {
|
||||
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
|
||||
data, err := cc.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
|
||||
}
|
||||
rn.raft.raftLog.append(ents...)
|
||||
|
||||
// Now apply them, mainly so that the application can call Campaign
|
||||
// immediately after StartNode in tests. Note that these nodes will
|
||||
// be added to raft twice: here and when the application's Ready
|
||||
// loop calls ApplyConfChange. The calls to addNode must come after
|
||||
// all calls to raftLog.append so progress.next is set after these
|
||||
// bootstrapping entries (it is an error if we try to append these
|
||||
// entries since they have already been committed).
|
||||
// We do not set raftLog.applied so the application will be able
|
||||
// to observe all conf changes via Ready.CommittedEntries.
|
||||
//
|
||||
// TODO(bdarnell): These entries are still unstable; do we need to preserve
|
||||
// the invariant that committed < unstable?
|
||||
r.raftLog.committed = uint64(len(ents))
|
||||
for _, peer := range peers {
|
||||
r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
|
||||
}
|
||||
}
|
||||
|
||||
// Set the initial hard and soft states after performing all initialization.
|
||||
rn.prevSoftSt = r.softState()
|
||||
if lastIndex == 0 {
|
||||
rn.prevHardSt = emptyState
|
||||
} else {
|
||||
rn.prevHardSt = r.hardState()
|
||||
}
|
||||
|
||||
return nil
|
||||
rn.prevHardSt = r.hardState()
|
||||
return rn, nil
|
||||
}
|
||||
|
||||
// Tick advances the internal logical clock by a single tick.
|
||||
|
@ -18,11 +18,12 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/raft/quorum"
|
||||
"go.etcd.io/etcd/raft/raftpb"
|
||||
pb "go.etcd.io/etcd/raft/raftpb"
|
||||
"go.etcd.io/etcd/raft/tracker"
|
||||
)
|
||||
|
||||
@ -61,28 +62,43 @@ func (a *rawNodeAdapter) ReadIndex(_ context.Context, rctx []byte) error {
|
||||
// RawNode swallowed the error in ReadIndex, it probably should not do that.
|
||||
return nil
|
||||
}
|
||||
func (a *rawNodeAdapter) Step(_ context.Context, m raftpb.Message) error { return a.RawNode.Step(m) }
|
||||
func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) }
|
||||
func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc raftpb.ConfChange) error {
|
||||
func (a *rawNodeAdapter) Step(_ context.Context, m pb.Message) error { return a.RawNode.Step(m) }
|
||||
func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) }
|
||||
func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc pb.ConfChange) error {
|
||||
return a.RawNode.ProposeConfChange(cc)
|
||||
}
|
||||
|
||||
// TestRawNodeStep ensures that RawNode.Step ignore local message.
|
||||
func TestRawNodeStep(t *testing.T) {
|
||||
for i, msgn := range raftpb.MessageType_name {
|
||||
s := NewMemoryStorage()
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
msgt := raftpb.MessageType(i)
|
||||
err = rawNode.Step(raftpb.Message{Type: msgt})
|
||||
// LocalMsg should be ignored.
|
||||
if IsLocalMsg(msgt) {
|
||||
if err != ErrStepLocalMsg {
|
||||
t.Errorf("%d: step should ignore %s", msgt, msgn)
|
||||
for i, msgn := range pb.MessageType_name {
|
||||
t.Run(msgn, func(t *testing.T) {
|
||||
s := NewMemoryStorage()
|
||||
s.SetHardState(pb.HardState{Term: 1, Commit: 1})
|
||||
s.Append([]pb.Entry{{Term: 1, Index: 1}})
|
||||
if err := s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{
|
||||
ConfState: pb.ConfState{
|
||||
Nodes: []uint64{1},
|
||||
},
|
||||
Index: 1,
|
||||
Term: 1,
|
||||
}}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
// Append an empty entry to make sure the non-local messages (like
|
||||
// vote requests) are ignored and don't trigger assertions.
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
msgt := pb.MessageType(i)
|
||||
err = rawNode.Step(pb.Message{Type: msgt})
|
||||
// LocalMsg should be ignored.
|
||||
if IsLocalMsg(msgt) {
|
||||
if err != ErrStepLocalMsg {
|
||||
t.Errorf("%d: step should ignore %s", msgt, msgn)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -94,17 +110,10 @@ func TestRawNodeStep(t *testing.T) {
|
||||
func TestRawNodeProposeAndConfChange(t *testing.T) {
|
||||
s := NewMemoryStorage()
|
||||
var err error
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
|
||||
rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rd := rawNode.Ready()
|
||||
s.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
|
||||
if d := rawNode.Ready(); d.MustSync || !IsEmptyHardState(d.HardState) || len(d.Entries) > 0 {
|
||||
t.Fatalf("expected empty hard state with must-sync=false: %#v", d)
|
||||
}
|
||||
|
||||
rawNode.Campaign()
|
||||
proposed := false
|
||||
@ -113,13 +122,15 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
|
||||
ccdata []byte
|
||||
)
|
||||
for {
|
||||
rd = rawNode.Ready()
|
||||
rd := rawNode.Ready()
|
||||
s.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
// Once we are the leader, propose a command and a ConfChange.
|
||||
if !proposed && rd.SoftState.Lead == rawNode.raft.id {
|
||||
rawNode.Propose([]byte("somedata"))
|
||||
|
||||
cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
|
||||
if err = rawNode.Propose([]byte("somedata")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1}
|
||||
ccdata, err = cc.Marshal()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -127,16 +138,13 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
|
||||
rawNode.ProposeConfChange(cc)
|
||||
|
||||
proposed = true
|
||||
}
|
||||
rawNode.Advance(rd)
|
||||
|
||||
// Exit when we have four entries: one ConfChange, one no-op for the election,
|
||||
// our proposed command and proposed ConfChange.
|
||||
lastIndex, err = s.LastIndex()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if lastIndex >= 4 {
|
||||
} else if proposed {
|
||||
// We proposed last cycle, which means we appended the conf change
|
||||
// in this cycle.
|
||||
lastIndex, err = s.LastIndex()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -151,8 +159,8 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
|
||||
if !bytes.Equal(entries[0].Data, []byte("somedata")) {
|
||||
t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
|
||||
}
|
||||
if entries[1].Type != raftpb.EntryConfChange {
|
||||
t.Fatalf("type = %v, want %v", entries[1].Type, raftpb.EntryConfChange)
|
||||
if entries[1].Type != pb.EntryConfChange {
|
||||
t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChange)
|
||||
}
|
||||
if !bytes.Equal(entries[1].Data, ccdata) {
|
||||
t.Errorf("data = %v, want %v", entries[1].Data, ccdata)
|
||||
@ -163,7 +171,7 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
|
||||
// not affect the later propose to add new node.
|
||||
func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
|
||||
s := NewMemoryStorage()
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
|
||||
rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -182,13 +190,13 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
|
||||
rawNode.Advance(rd)
|
||||
}
|
||||
|
||||
proposeConfChangeAndApply := func(cc raftpb.ConfChange) {
|
||||
proposeConfChangeAndApply := func(cc pb.ConfChange) {
|
||||
rawNode.ProposeConfChange(cc)
|
||||
rd = rawNode.Ready()
|
||||
s.Append(rd.Entries)
|
||||
for _, entry := range rd.CommittedEntries {
|
||||
if entry.Type == raftpb.EntryConfChange {
|
||||
var cc raftpb.ConfChange
|
||||
if entry.Type == pb.EntryConfChange {
|
||||
var cc pb.ConfChange
|
||||
cc.Unmarshal(entry.Data)
|
||||
rawNode.ApplyConfChange(cc)
|
||||
}
|
||||
@ -196,7 +204,7 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
|
||||
rawNode.Advance(rd)
|
||||
}
|
||||
|
||||
cc1 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
|
||||
cc1 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1}
|
||||
ccdata1, err := cc1.Marshal()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -207,7 +215,7 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
|
||||
proposeConfChangeAndApply(cc1)
|
||||
|
||||
// the new node join should be ok
|
||||
cc2 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
|
||||
cc2 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 2}
|
||||
ccdata2, err := cc2.Marshal()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -238,16 +246,16 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
|
||||
// TestRawNodeReadIndex ensures that Rawnode.ReadIndex sends the MsgReadIndex message
|
||||
// to the underlying raft. It also ensures that ReadState can be read out.
|
||||
func TestRawNodeReadIndex(t *testing.T) {
|
||||
msgs := []raftpb.Message{}
|
||||
appendStep := func(r *raft, m raftpb.Message) error {
|
||||
msgs := []pb.Message{}
|
||||
appendStep := func(r *raft, m pb.Message) error {
|
||||
msgs = append(msgs, m)
|
||||
return nil
|
||||
}
|
||||
wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
|
||||
|
||||
s := NewMemoryStorage()
|
||||
c := newTestConfig(1, nil, 10, 1, s)
|
||||
rawNode, err := NewRawNode(c, []Peer{{ID: 1}})
|
||||
c := newTestConfig(1, []uint64{1}, 10, 1, s)
|
||||
rawNode, err := NewRawNode(c)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -288,8 +296,8 @@ func TestRawNodeReadIndex(t *testing.T) {
|
||||
if len(msgs) != 1 {
|
||||
t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
|
||||
}
|
||||
if msgs[0].Type != raftpb.MsgReadIndex {
|
||||
t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex)
|
||||
if msgs[0].Type != pb.MsgReadIndex {
|
||||
t.Errorf("msg type = %d, want %d", msgs[0].Type, pb.MsgReadIndex)
|
||||
}
|
||||
if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
|
||||
t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
|
||||
@ -305,61 +313,108 @@ func TestRawNodeReadIndex(t *testing.T) {
|
||||
// TestNodeStop from node_test.go has no equivalent in rawNode because there is
|
||||
// no goroutine in RawNode.
|
||||
|
||||
// TestRawNodeStart ensures that a node can be started correctly. The node should
|
||||
// start with correct configuration change entries, and can accept and commit
|
||||
// proposals.
|
||||
// TestRawNodeStart ensures that a node can be started correctly. Note that RawNode
|
||||
// requires the application to bootstrap the state, i.e. it does not accept peers
|
||||
// and will not create faux configuration change entries.
|
||||
func TestRawNodeStart(t *testing.T) {
|
||||
cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
|
||||
ccdata, err := cc.Marshal()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected marshal error: %v", err)
|
||||
}
|
||||
wants := []Ready{
|
||||
{
|
||||
HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0},
|
||||
Entries: []raftpb.Entry{
|
||||
{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
|
||||
},
|
||||
CommittedEntries: []raftpb.Entry{
|
||||
{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
|
||||
},
|
||||
MustSync: true,
|
||||
want := Ready{
|
||||
SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
|
||||
HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1},
|
||||
Entries: []pb.Entry{
|
||||
{Term: 1, Index: 2, Data: nil}, // empty entry
|
||||
{Term: 1, Index: 3, Data: []byte("foo")}, // empty entry
|
||||
},
|
||||
{
|
||||
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
|
||||
Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
|
||||
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
|
||||
MustSync: true,
|
||||
CommittedEntries: []pb.Entry{
|
||||
{Term: 1, Index: 2, Data: nil}, // empty entry
|
||||
{Term: 1, Index: 3, Data: []byte("foo")}, // empty entry
|
||||
},
|
||||
MustSync: true,
|
||||
}
|
||||
|
||||
storage := NewMemoryStorage()
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
|
||||
storage.ents[0].Index = 1
|
||||
|
||||
// TODO(tbg): this is a first prototype of what bootstrapping could look
|
||||
// like (without the annoying faux ConfChanges). We want to persist a
|
||||
// ConfState at some index and make sure that this index can't be reached
|
||||
// from log position 1, so that followers are forced to pick up the
|
||||
// ConfState in order to move away from log position 1 (unless they got
|
||||
// bootstrapped in the same way already). Failing to do so would mean that
|
||||
// followers diverge from the bootstrapped nodes and don't learn about the
|
||||
// initial config.
|
||||
//
|
||||
// NB: this is exactly what CockroachDB does. The Raft log really begins at
|
||||
// index 10, so empty followers (at index 1) always need a snapshot first.
|
||||
type appenderStorage interface {
|
||||
Storage
|
||||
ApplySnapshot(pb.Snapshot) error
|
||||
}
|
||||
bootstrap := func(storage appenderStorage, cs pb.ConfState) error {
|
||||
if len(cs.Nodes) == 0 {
|
||||
return fmt.Errorf("no voters specified")
|
||||
}
|
||||
fi, err := storage.FirstIndex()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if fi < 2 {
|
||||
return fmt.Errorf("FirstIndex >= 2 is prerequisite for bootstrap")
|
||||
}
|
||||
if _, err = storage.Entries(fi, fi, math.MaxUint64); err == nil {
|
||||
// TODO(tbg): match exact error
|
||||
return fmt.Errorf("should not have been able to load first index")
|
||||
}
|
||||
li, err := storage.LastIndex()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = storage.Entries(li, li, math.MaxUint64); err == nil {
|
||||
return fmt.Errorf("should not have been able to load last index")
|
||||
}
|
||||
hs, ics, err := storage.InitialState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !IsEmptyHardState(hs) {
|
||||
return fmt.Errorf("HardState not empty")
|
||||
}
|
||||
if len(ics.Nodes) != 0 {
|
||||
return fmt.Errorf("ConfState not empty")
|
||||
}
|
||||
|
||||
meta := pb.SnapshotMetadata{
|
||||
Index: 1,
|
||||
Term: 0,
|
||||
ConfState: cs,
|
||||
}
|
||||
snap := pb.Snapshot{Metadata: meta}
|
||||
return storage.ApplySnapshot(snap)
|
||||
}
|
||||
|
||||
if err := bootstrap(storage, pb.ConfState{Nodes: []uint64{1}}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rd := rawNode.Ready()
|
||||
t.Logf("rd %v", rd)
|
||||
if !reflect.DeepEqual(rd, wants[0]) {
|
||||
t.Fatalf("#%d: g = %+v,\n w %+v", 1, rd, wants[0])
|
||||
} else {
|
||||
storage.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
if rawNode.HasReady() {
|
||||
t.Fatalf("unexpected ready: %+v", rawNode.Ready())
|
||||
}
|
||||
storage.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
|
||||
rawNode.Campaign()
|
||||
rd = rawNode.Ready()
|
||||
rawNode.Propose([]byte("foo"))
|
||||
if !rawNode.HasReady() {
|
||||
t.Fatal("expected a Ready")
|
||||
}
|
||||
rd := rawNode.Ready()
|
||||
storage.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
|
||||
rawNode.Propose([]byte("foo"))
|
||||
if rd = rawNode.Ready(); !reflect.DeepEqual(rd, wants[1]) {
|
||||
t.Errorf("#%d: g = %+v,\n w %+v", 2, rd, wants[1])
|
||||
} else {
|
||||
storage.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
rd.SoftState, want.SoftState = nil, nil
|
||||
|
||||
if !reflect.DeepEqual(rd, want) {
|
||||
t.Fatalf("unexpected Ready:\n%+v\nvs\n%+v", rd, want)
|
||||
}
|
||||
|
||||
if rawNode.HasReady() {
|
||||
@ -368,11 +423,11 @@ func TestRawNodeStart(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRawNodeRestart(t *testing.T) {
|
||||
entries := []raftpb.Entry{
|
||||
entries := []pb.Entry{
|
||||
{Term: 1, Index: 1},
|
||||
{Term: 1, Index: 2, Data: []byte("foo")},
|
||||
}
|
||||
st := raftpb.HardState{Term: 1, Commit: 1}
|
||||
st := pb.HardState{Term: 1, Commit: 1}
|
||||
|
||||
want := Ready{
|
||||
HardState: emptyState,
|
||||
@ -384,7 +439,7 @@ func TestRawNodeRestart(t *testing.T) {
|
||||
storage := NewMemoryStorage()
|
||||
storage.SetHardState(st)
|
||||
storage.Append(entries)
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), nil)
|
||||
rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, storage))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -399,17 +454,17 @@ func TestRawNodeRestart(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRawNodeRestartFromSnapshot(t *testing.T) {
|
||||
snap := raftpb.Snapshot{
|
||||
Metadata: raftpb.SnapshotMetadata{
|
||||
ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}},
|
||||
snap := pb.Snapshot{
|
||||
Metadata: pb.SnapshotMetadata{
|
||||
ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
|
||||
Index: 2,
|
||||
Term: 1,
|
||||
},
|
||||
}
|
||||
entries := []raftpb.Entry{
|
||||
entries := []pb.Entry{
|
||||
{Term: 1, Index: 3, Data: []byte("foo")},
|
||||
}
|
||||
st := raftpb.HardState{Term: 1, Commit: 3}
|
||||
st := pb.HardState{Term: 1, Commit: 3}
|
||||
|
||||
want := Ready{
|
||||
HardState: emptyState,
|
||||
@ -422,7 +477,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) {
|
||||
s.SetHardState(st)
|
||||
s.ApplySnapshot(snap)
|
||||
s.Append(entries)
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), nil)
|
||||
rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -441,7 +496,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) {
|
||||
|
||||
func TestRawNodeStatus(t *testing.T) {
|
||||
s := NewMemoryStorage()
|
||||
rn, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s), nil)
|
||||
rn, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -489,20 +544,20 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
|
||||
s := &ignoreSizeHintMemStorage{
|
||||
MemoryStorage: NewMemoryStorage(),
|
||||
}
|
||||
persistedHardState := raftpb.HardState{
|
||||
persistedHardState := pb.HardState{
|
||||
Term: 1,
|
||||
Vote: 1,
|
||||
Commit: 10,
|
||||
}
|
||||
|
||||
s.hardState = persistedHardState
|
||||
s.ents = make([]raftpb.Entry, 10)
|
||||
s.ents = make([]pb.Entry, 10)
|
||||
var size uint64
|
||||
for i := range s.ents {
|
||||
ent := raftpb.Entry{
|
||||
ent := pb.Entry{
|
||||
Term: 1,
|
||||
Index: uint64(i + 1),
|
||||
Type: raftpb.EntryNormal,
|
||||
Type: pb.EntryNormal,
|
||||
Data: []byte("a"),
|
||||
}
|
||||
|
||||
@ -516,14 +571,14 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
|
||||
// this and *will* return it (which is how the Commit index ended up being 10 initially).
|
||||
cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
|
||||
|
||||
s.ents = append(s.ents, raftpb.Entry{
|
||||
s.ents = append(s.ents, pb.Entry{
|
||||
Term: 1,
|
||||
Index: uint64(11),
|
||||
Type: raftpb.EntryNormal,
|
||||
Type: pb.EntryNormal,
|
||||
Data: []byte("boom"),
|
||||
})
|
||||
|
||||
rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}})
|
||||
rawNode, err := NewRawNode(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -539,8 +594,8 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
|
||||
}
|
||||
highestApplied = rd.CommittedEntries[n-1].Index
|
||||
rawNode.Advance(rd)
|
||||
rawNode.Step(raftpb.Message{
|
||||
Type: raftpb.MsgHeartbeat,
|
||||
rawNode.Step(pb.Message{
|
||||
Type: pb.MsgHeartbeat,
|
||||
To: 1,
|
||||
From: 1, // illegal, but we get away with it
|
||||
Term: 1,
|
||||
@ -556,13 +611,13 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
|
||||
func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
|
||||
const maxEntries = 16
|
||||
data := []byte("testdata")
|
||||
testEntry := raftpb.Entry{Data: data}
|
||||
testEntry := pb.Entry{Data: data}
|
||||
maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))
|
||||
|
||||
s := NewMemoryStorage()
|
||||
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
|
||||
cfg.MaxUncommittedEntriesSize = maxEntrySize
|
||||
rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}})
|
||||
rawNode, err := NewRawNode(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user