Merge pull request #13760 from ahrtr/fix_raftexample_snapshot

Update the confstate before sending snapshot
This commit is contained in:
Marek Siarkowicz
2022-03-08 10:34:02 +01:00
committed by GitHub
2 changed files with 142 additions and 1 deletions

View File

@@ -456,7 +456,7 @@ func (rc *raftNode) serveChannels() {
rc.publishSnapshot(rd.Snapshot)
}
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
rc.transport.Send(rc.processMessages(rd.Messages))
applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
if !ok {
rc.stop()
@@ -476,6 +476,18 @@ func (rc *raftNode) serveChannels() {
}
}
// When there is a `raftpb.EntryConfChange` after creating the snapshot,
// then the confState included in the snapshot is out of date. so We need
// to update the confState before sending a snapshot to a follower.
func (rc *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
for i := 0; i < len(ms); i++ {
if ms[i].Type == raftpb.MsgSnap {
ms[i].Snapshot.Metadata.ConfState = rc.confState
}
}
return ms
}
func (rc *raftNode) serveRaft() {
url, err := url.Parse(rc.peers[rc.id-1])
if err != nil {

View File

@@ -0,0 +1,129 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"go.etcd.io/etcd/raft/v3/raftpb"
"reflect"
"testing"
)
func TestProcessMessages(t *testing.T) {
cases := []struct {
name string
confState raftpb.ConfState
InputMessages []raftpb.Message
ExpectedMessages []raftpb.Message
}{
{
name: "only one snapshot message",
confState: raftpb.ConfState{
Voters: []uint64{2, 6, 8, 10},
},
InputMessages: []raftpb.Message{
{
Type: raftpb.MsgSnap,
To: 8,
Snapshot: raftpb.Snapshot{
Metadata: raftpb.SnapshotMetadata{
Index: 100,
Term: 3,
ConfState: raftpb.ConfState{
Voters: []uint64{2, 6, 8},
AutoLeave: true,
},
},
},
},
},
ExpectedMessages: []raftpb.Message{
{
Type: raftpb.MsgSnap,
To: 8,
Snapshot: raftpb.Snapshot{
Metadata: raftpb.SnapshotMetadata{
Index: 100,
Term: 3,
ConfState: raftpb.ConfState{
Voters: []uint64{2, 6, 8, 10},
},
},
},
},
},
},
{
name: "one snapshot message and one other message",
confState: raftpb.ConfState{
Voters: []uint64{2, 7, 8, 12},
},
InputMessages: []raftpb.Message{
{
Type: raftpb.MsgSnap,
To: 8,
Snapshot: raftpb.Snapshot{
Metadata: raftpb.SnapshotMetadata{
Index: 100,
Term: 3,
ConfState: raftpb.ConfState{
Voters: []uint64{2, 6, 8},
AutoLeave: true,
},
},
},
},
{
Type: raftpb.MsgApp,
From: 6,
To: 8,
},
},
ExpectedMessages: []raftpb.Message{
{
Type: raftpb.MsgSnap,
To: 8,
Snapshot: raftpb.Snapshot{
Metadata: raftpb.SnapshotMetadata{
Index: 100,
Term: 3,
ConfState: raftpb.ConfState{
Voters: []uint64{2, 7, 8, 12},
},
},
},
},
{
Type: raftpb.MsgApp,
From: 6,
To: 8,
},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
rn := &raftNode{
confState: tc.confState,
}
outputMessages := rn.processMessages(tc.InputMessages)
if !reflect.DeepEqual(outputMessages, tc.ExpectedMessages) {
t.Fatalf("Unexpected messages, expected: %v, got %v", tc.ExpectedMessages, outputMessages)
}
})
}
}