Merge pull request #10889 from tbg/joint-conf-change-logic

raft: internally support joint consensus
This commit is contained in:
Tobias Grieger 2019-07-16 16:02:16 +02:00 committed by GitHub
commit 9fba06ba3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1171 additions and 132 deletions

View File

@ -0,0 +1,420 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package confchange
import (
"errors"
"fmt"
"strings"
"go.etcd.io/etcd/raft/quorum"
pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
)
// Changer facilitates configuration changes. It exposes methods to handle
// simple and joint consensus while performing the proper validation that allows
// refusing invalid configuration changes before they affect the active
// configuration.
type Changer struct {
Tracker tracker.ProgressTracker
LastIndex uint64
}
// EnterJoint verifies that the outgoing (=right) majority config of the joint
// config is empty and initializes it with a copy of the incoming (=left)
// majority config. That is, it transitions from
//
// (1 2 3)&&()
// to
// (1 2 3)&&(1 2 3).
//
// The supplied ConfChanges are then applied to the incoming majority config,
// resulting in a joint configuration that in terms of the Raft thesis[1]
// (Section 4.3) corresponds to `C_{new,old}`.
//
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
func (c Changer) EnterJoint(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressMap, error) {
cfg, prs, err := c.checkAndCopy()
if err != nil {
return c.err(err)
}
if joint(cfg) {
err := errors.New("config is already joint")
return c.err(err)
}
if len(incoming(cfg.Voters)) == 0 {
// We allow adding nodes to an empty config for convenience (testing and
// bootstrap), but you can't enter a joint state.
err := errors.New("can't make a zero-voter config joint")
return c.err(err)
}
// Clear the outgoing config.
{
*outgoingPtr(&cfg.Voters) = quorum.MajorityConfig{}
}
// Copy incoming to outgoing.
for id := range incoming(cfg.Voters) {
outgoing(cfg.Voters)[id] = struct{}{}
}
if err := c.apply(&cfg, prs, ccs...); err != nil {
return c.err(err)
}
return checkAndReturn(cfg, prs)
}
// LeaveJoint transitions out of a joint configuration. It is an error to call
// this method if the configuration is not joint, i.e. if the outgoing majority
// config Voters[1] is empty.
//
// The outgoing majority config of the joint configuration will be removed,
// that is, the incoming config is promoted as the sole decision maker. In the
// notation of the Raft thesis[1] (Section 4.3), this method transitions from
// `C_{new,old}` into `C_new`.
//
// At the same time, any staged learners (LearnersNext) the addition of which
// was held back by an overlapping voter in the former outgoing config will be
// inserted into Learners.
//
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
cfg, prs, err := c.checkAndCopy()
if err != nil {
return c.err(err)
}
if !joint(cfg) {
err := errors.New("can't leave a non-joint config")
return c.err(err)
}
if len(outgoing(cfg.Voters)) == 0 {
err := fmt.Errorf("configuration is not joint: %v", cfg)
return c.err(err)
}
for id := range cfg.LearnersNext {
nilAwareAdd(&cfg.Learners, id)
prs[id].IsLearner = true
}
cfg.LearnersNext = nil
for id := range outgoing(cfg.Voters) {
_, isVoter := incoming(cfg.Voters)[id]
_, isLearner := cfg.Learners[id]
if !isVoter && !isLearner {
delete(prs, id)
}
}
*outgoingPtr(&cfg.Voters) = nil
return checkAndReturn(cfg, prs)
}
// Simple carries out a series of configuration changes that (in aggregate)
// mutates the incoming majority config Voters[0] by at most one. This method
// will return an error if that is not the case, if the resulting quorum is
// zero, or if the configuration is in a joint state (i.e. if there is an
// outgoing configuration).
func (c Changer) Simple(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressMap, error) {
cfg, prs, err := c.checkAndCopy()
if err != nil {
return c.err(err)
}
if joint(cfg) {
err := errors.New("can't apply simple config change in joint config")
return c.err(err)
}
if err := c.apply(&cfg, prs, ccs...); err != nil {
return c.err(err)
}
if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 {
return tracker.Config{}, nil, errors.New("more than voter changed without entering joint config")
}
if err := checkInvariants(cfg, prs); err != nil {
return tracker.Config{}, tracker.ProgressMap{}, nil
}
return checkAndReturn(cfg, prs)
}
// apply a ConfChange to the configuration. By convention, changes to voters are
// always made to the incoming majority config Voters[0]. Voters[1] is either
// empty or preserves the outgoing majority configuration while in a joint state.
func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.ConfChange) error {
for _, cc := range ccs {
if cc.NodeID == 0 {
// etcd replaces the NodeID with zero if it decides (downstream of
// raft) to not apply a ConfChange, so we have to have explicit code
// here to ignore these.
continue
}
switch cc.Type {
case pb.ConfChangeAddNode:
c.makeVoter(cfg, prs, cc.NodeID)
case pb.ConfChangeAddLearnerNode:
c.makeLearner(cfg, prs, cc.NodeID)
case pb.ConfChangeRemoveNode:
c.remove(cfg, prs, cc.NodeID)
case pb.ConfChangeUpdateNode:
default:
return fmt.Errorf("unexpected conf type %d", cc.Type)
}
}
if len(incoming(cfg.Voters)) == 0 {
return errors.New("removed all voters")
}
return nil
}
// makeVoter adds or promotes the given ID to be a voter in the incoming
// majority config.
func (c Changer) makeVoter(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
pr := prs[id]
if pr == nil {
c.initProgress(cfg, prs, id, false /* isLearner */)
return
}
pr.IsLearner = false
nilAwareDelete(&cfg.Learners, id)
nilAwareDelete(&cfg.LearnersNext, id)
incoming(cfg.Voters)[id] = struct{}{}
return
}
// makeLearner makes the given ID a learner or stages it to be a learner once
// an active joint configuration is exited.
//
// The former happens when the peer is not a part of the outgoing config, in
// which case we either add a new learner or demote a voter in the incoming
// config.
//
// The latter case occurs when the configuration is joint and the peer is a
// voter in the outgoing config. In that case, we do not want to add the peer
// as a learner because then we'd have to track a peer as a voter and learner
// simultaneously. Instead, we add the learner to LearnersNext, so that it will
// be added to Learners the moment the outgoing config is removed by
// LeaveJoint().
func (c Changer) makeLearner(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
pr := prs[id]
if pr == nil {
c.initProgress(cfg, prs, id, true /* isLearner */)
return
}
if pr.IsLearner {
return
}
// Remove any existing voter in the incoming config...
c.remove(cfg, prs, id)
// ... but save the Progress.
prs[id] = pr
// Use LearnersNext if we can't add the learner to Learners directly, i.e.
// if the peer is still tracked as a voter in the outgoing config. It will
// be turned into a learner in LeaveJoint().
//
// Otherwise, add a regular learner right away.
if _, onRight := outgoing(cfg.Voters)[id]; onRight {
nilAwareAdd(&cfg.LearnersNext, id)
} else {
pr.IsLearner = true
nilAwareAdd(&cfg.Learners, id)
}
}
// remove this peer as a voter or learner from the incoming config.
func (c Changer) remove(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
if _, ok := prs[id]; !ok {
return
}
delete(incoming(cfg.Voters), id)
nilAwareDelete(&cfg.Learners, id)
nilAwareDelete(&cfg.LearnersNext, id)
// If the peer is still a voter in the outgoing config, keep the Progress.
if _, onRight := outgoing(cfg.Voters)[id]; !onRight {
delete(prs, id)
}
}
// initProgress initializes a new progress for the given node or learner.
func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id uint64, isLearner bool) {
if !isLearner {
incoming(cfg.Voters)[id] = struct{}{}
} else {
nilAwareAdd(&cfg.Learners, id)
}
prs[id] = &tracker.Progress{
// We initialize Progress.Next with lastIndex+1 so that the peer will be
// probed without an index first.
//
// TODO(tbg): verify that, this is just my best guess.
Next: c.LastIndex + 1,
Match: 0,
Inflights: tracker.NewInflights(c.Tracker.MaxInflight),
IsLearner: isLearner,
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has had a chance to communicate with us.
RecentActive: true,
}
}
// checkInvariants makes sure that the config and progress are compatible with
// each other. This is used to check both what the Changer is initialized with,
// as well as what it returns.
func checkInvariants(cfg tracker.Config, prs tracker.ProgressMap) error {
// NB: intentionally allow the empty config. In production we'll never see a
// non-empty config (we prevent it from being created) but we will need to
// be able to *create* an initial config, for example during bootstrap (or
// during tests). Instead of having to hand-code this, we allow
// transitioning from an empty config into any other legal and non-empty
// config.
for _, ids := range []map[uint64]struct{}{
cfg.Voters.IDs(),
cfg.Learners,
cfg.LearnersNext,
} {
for id := range ids {
if _, ok := prs[id]; !ok {
return fmt.Errorf("no progress for %d", id)
}
}
}
// Any staged learner was staged because it could not be directly added due
// to a conflicting voter in the outgoing config.
for id := range cfg.LearnersNext {
if _, ok := outgoing(cfg.Voters)[id]; !ok {
return fmt.Errorf("%d is in LearnersNext, but not Voters[1]", id)
}
if prs[id].IsLearner {
return fmt.Errorf("%d is in LearnersNext, but is already marked as learner", id)
}
}
// Conversely Learners and Voters doesn't intersect at all.
for id := range cfg.Learners {
if _, ok := outgoing(cfg.Voters)[id]; ok {
return fmt.Errorf("%d is in Learners and Voters[1]", id)
}
if _, ok := incoming(cfg.Voters)[id]; ok {
return fmt.Errorf("%d is in Learners and Voters[0]", id)
}
if !prs[id].IsLearner {
return fmt.Errorf("%d is in Learners, but is not marked as learner", id)
}
}
if !joint(cfg) {
// We enforce that empty maps are nil instead of zero.
if outgoing(cfg.Voters) != nil {
return fmt.Errorf("Voters[1] must be nil when not joint")
}
if cfg.LearnersNext != nil {
return fmt.Errorf("LearnersNext must be nil when not joint")
}
}
return nil
}
// checkAndCopy copies the tracker's config and progress map (deeply enough for
// the purposes of the Changer) and returns those copies. It returns an error
// if checkInvariants does.
func (c Changer) checkAndCopy() (tracker.Config, tracker.ProgressMap, error) {
cfg := c.Tracker.Config.Clone()
prs := tracker.ProgressMap{}
for id, pr := range c.Tracker.Progress {
// A shallow copy is enough because we only mutate the Learner field.
ppr := *pr
prs[id] = &ppr
}
return checkAndReturn(cfg, prs)
}
// checkAndReturn calls checkInvariants on the input and returns either the
// resulting error or the input.
func checkAndReturn(cfg tracker.Config, prs tracker.ProgressMap) (tracker.Config, tracker.ProgressMap, error) {
if err := checkInvariants(cfg, prs); err != nil {
return tracker.Config{}, tracker.ProgressMap{}, err
}
return cfg, prs, nil
}
// err returns zero values and an error.
func (c Changer) err(err error) (tracker.Config, tracker.ProgressMap, error) {
return tracker.Config{}, nil, err
}
// nilAwareAdd populates a map entry, creating the map if necessary.
func nilAwareAdd(m *map[uint64]struct{}, id uint64) {
if *m == nil {
*m = map[uint64]struct{}{}
}
(*m)[id] = struct{}{}
}
// nilAwareDelete deletes from a map, nil'ing the map itself if it is empty after.
func nilAwareDelete(m *map[uint64]struct{}, id uint64) {
if *m == nil {
return
}
delete(*m, id)
if len(*m) == 0 {
*m = nil
}
}
// symdiff returns the count of the symmetric difference between the sets of
// uint64s, i.e. len( (l - r) \union (r - l)).
func symdiff(l, r map[uint64]struct{}) int {
var n int
pairs := [][2]quorum.MajorityConfig{
{l, r}, // count elems in l but not in r
{r, l}, // count elems in r but not in l
}
for _, p := range pairs {
for id := range p[0] {
if _, ok := p[1][id]; !ok {
n++
}
}
}
return n
}
func joint(cfg tracker.Config) bool {
return len(outgoing(cfg.Voters)) > 0
}
func incoming(voters quorum.JointConfig) quorum.MajorityConfig { return voters[0] }
func outgoing(voters quorum.JointConfig) quorum.MajorityConfig { return voters[1] }
func outgoingPtr(voters *quorum.JointConfig) *quorum.MajorityConfig { return &voters[1] }
// Describe prints the type and NodeID of the configuration changes as a
// space-delimited string.
func Describe(ccs ...pb.ConfChange) string {
var buf strings.Builder
for _, cc := range ccs {
if buf.Len() > 0 {
buf.WriteByte(' ')
}
fmt.Fprintf(&buf, "%s(%d)", cc.Type, cc.NodeID)
}
return buf.String()
}

View File

@ -0,0 +1,105 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package confchange
import (
"errors"
"fmt"
"strconv"
"strings"
"testing"
"github.com/cockroachdb/datadriven"
pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
)
func TestConfChangeDataDriven(t *testing.T) {
datadriven.Walk(t, "testdata", func(t *testing.T, path string) {
tr := tracker.MakeProgressTracker(10)
c := Changer{
Tracker: tr,
LastIndex: 0, // incremented in this test with each cmd
}
// The test files use the commands
// - simple: run a simple conf change (i.e. no joint consensus),
// - enter-joint: enter a joint config, and
// - leave-joint: leave a joint config.
// The first two take a list of config changes, which have the following
// syntax:
// - vn: make n a voter,
// - ln: make n a learner,
// - rn: remove n, and
// - un: update n.
datadriven.RunTest(t, path, func(d *datadriven.TestData) string {
defer func() {
c.LastIndex++
}()
var ccs []pb.ConfChange
toks := strings.Split(strings.TrimSpace(d.Input), " ")
if toks[0] == "" {
toks = nil
}
for _, tok := range toks {
if len(tok) < 2 {
return fmt.Sprintf("unknown token %s", tok)
}
var cc pb.ConfChange
switch tok[0] {
case 'v':
cc.Type = pb.ConfChangeAddNode
case 'l':
cc.Type = pb.ConfChangeAddLearnerNode
case 'r':
cc.Type = pb.ConfChangeRemoveNode
case 'u':
cc.Type = pb.ConfChangeUpdateNode
default:
return fmt.Sprintf("unknown input: %s", tok)
}
id, err := strconv.ParseUint(tok[1:], 10, 64)
if err != nil {
return err.Error()
}
cc.NodeID = id
ccs = append(ccs, cc)
}
var cfg tracker.Config
var prs tracker.ProgressMap
var err error
switch d.Cmd {
case "simple":
cfg, prs, err = c.Simple(ccs...)
case "enter-joint":
cfg, prs, err = c.EnterJoint(ccs...)
case "leave-joint":
if len(ccs) > 0 {
err = errors.New("this command takes no input")
} else {
cfg, prs, err = c.LeaveJoint()
}
default:
return "unknown command"
}
if err != nil {
return err.Error() + "\n"
}
c.Tracker.Config, c.Tracker.Progress = cfg, prs
return fmt.Sprintf("%s\n%s", c.Tracker.Config, c.Tracker.Progress)
})
})
}

View File

@ -0,0 +1,168 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package confchange
import (
"math/rand"
"reflect"
"testing"
"testing/quick"
pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
)
// TestConfChangeQuick uses quickcheck to verify that simple and joint config
// changes arrive at the same result.
func TestConfChangeQuick(t *testing.T) {
cfg := &quick.Config{
MaxCount: 1000,
}
// Log the first couple of runs to give some indication of things working
// as intended.
const infoCount = 5
runWithJoint := func(c *Changer, ccs []pb.ConfChange) error {
cfg, prs, err := c.EnterJoint(ccs...)
if err != nil {
return err
}
c.Tracker.Config = cfg
c.Tracker.Progress = prs
cfg, prs, err = c.LeaveJoint()
if err != nil {
return err
}
c.Tracker.Config = cfg
c.Tracker.Progress = prs
return nil
}
runWithSimple := func(c *Changer, ccs []pb.ConfChange) error {
for _, cc := range ccs {
cfg, prs, err := c.Simple(cc)
if err != nil {
return err
}
c.Tracker.Config, c.Tracker.Progress = cfg, prs
}
return nil
}
type testFunc func(*Changer, []pb.ConfChange) error
wrapper := func(invoke testFunc) func(setup initialChanges, ccs confChanges) (*Changer, error) {
return func(setup initialChanges, ccs confChanges) (*Changer, error) {
tr := tracker.MakeProgressTracker(10)
c := &Changer{
Tracker: tr,
LastIndex: 10,
}
if err := runWithSimple(c, setup); err != nil {
return nil, err
}
err := invoke(c, ccs)
return c, err
}
}
var n int
f1 := func(setup initialChanges, ccs confChanges) *Changer {
c, err := wrapper(runWithSimple)(setup, ccs)
if err != nil {
t.Fatal(err)
}
if n < infoCount {
t.Log("initial setup:", Describe(setup...))
t.Log("changes:", Describe(ccs...))
t.Log(c.Tracker.Config)
t.Log(c.Tracker.Progress)
}
n++
return c
}
f2 := func(setup initialChanges, ccs confChanges) *Changer {
c, err := wrapper(runWithJoint)(setup, ccs)
if err != nil {
t.Fatal(err)
}
return c
}
err := quick.CheckEqual(f1, f2, cfg)
if err == nil {
return
}
cErr, ok := err.(*quick.CheckEqualError)
if !ok {
t.Fatal(err)
}
t.Error("setup:", Describe(cErr.In[0].([]pb.ConfChange)...))
t.Error("ccs:", Describe(cErr.In[1].([]pb.ConfChange)...))
t.Errorf("out1: %+v\nout2: %+v", cErr.Out1, cErr.Out2)
}
type confChangeTyp pb.ConfChangeType
func (confChangeTyp) Generate(rand *rand.Rand, _ int) reflect.Value {
return reflect.ValueOf(confChangeTyp(rand.Intn(4)))
}
type confChanges []pb.ConfChange
func genCC(num func() int, id func() uint64, typ func() pb.ConfChangeType) []pb.ConfChange {
var ccs []pb.ConfChange
n := num()
for i := 0; i < n; i++ {
ccs = append(ccs, pb.ConfChange{Type: typ(), NodeID: id()})
}
return ccs
}
func (confChanges) Generate(rand *rand.Rand, _ int) reflect.Value {
num := func() int {
return 1 + rand.Intn(9)
}
id := func() uint64 {
// Note that num() >= 1, so we're never returning 1 from this method,
// meaning that we'll never touch NodeID one, which is special to avoid
// voterless configs altogether in this test.
return 1 + uint64(num())
}
typ := func() pb.ConfChangeType {
return pb.ConfChangeType(rand.Intn(len(pb.ConfChangeType_name)))
}
return reflect.ValueOf(genCC(num, id, typ))
}
type initialChanges []pb.ConfChange
func (initialChanges) Generate(rand *rand.Rand, _ int) reflect.Value {
num := func() int {
return 1 + rand.Intn(5)
}
id := func() uint64 { return uint64(num()) }
typ := func() pb.ConfChangeType {
return pb.ConfChangeAddNode
}
// NodeID one is special - it's in the initial config and will be a voter
// always (this is to avoid uninteresting edge cases where the simple conf
// changes can't easily make progress).
ccs := append([]pb.ConfChange{{Type: pb.ConfChangeAddNode, NodeID: 1}}, genCC(num, id, typ)...)
return reflect.ValueOf(ccs)
}

View File

@ -0,0 +1,23 @@
# Verify that operations upon entering the joint state are idempotent, i.e.
# removing an absent node is fine, etc.
simple
v1
----
voters=(1)
1: StateProbe match=0 next=1
enter-joint
r1 r2 r9 v2 v3 v4 v2 v3 v4 l2 l2 r4 r4 l1 l1
----
voters=(3)&&(1) learners=(2) learners_next=(1)
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2 learner
3: StateProbe match=0 next=2
leave-joint
----
voters=(3) learners=(1 2)
1: StateProbe match=0 next=1 learner
2: StateProbe match=0 next=2 learner
3: StateProbe match=0 next=2

View File

@ -0,0 +1,24 @@
# Verify that when a voter is demoted in a joint config, it will show up in
# learners_next until the joint config is left, and only then will the progress
# turn into that of a learner, without resetting the progress. Note that this
# last fact is verified by `next`, which can tell us which "round" the progress
# was originally created in.
simple
v1
----
voters=(1)
1: StateProbe match=0 next=1
enter-joint
v2 l1
----
voters=(2)&&(1) learners_next=(1)
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2
leave-joint
----
voters=(2) learners=(1)
1: StateProbe match=0 next=1 learner
2: StateProbe match=0 next=2

View File

@ -0,0 +1,81 @@
leave-joint
----
can't leave a non-joint config
enter-joint
----
can't make a zero-voter config joint
enter-joint
v1
----
can't make a zero-voter config joint
simple
v1
----
voters=(1)
1: StateProbe match=0 next=4
leave-joint
----
can't leave a non-joint config
# Can enter into joint config.
enter-joint
----
voters=(1)&&(1)
1: StateProbe match=0 next=4
enter-joint
----
config is already joint
leave-joint
----
voters=(1)
1: StateProbe match=0 next=4
leave-joint
----
can't leave a non-joint config
# Can enter again, this time with some ops.
enter-joint
r1 v2 v3 l4
----
voters=(2 3)&&(1) learners=(4)
1: StateProbe match=0 next=4
2: StateProbe match=0 next=10
3: StateProbe match=0 next=10
4: StateProbe match=0 next=10 learner
enter-joint
----
config is already joint
enter-joint
v12
----
config is already joint
simple
l15
----
can't apply simple config change in joint config
leave-joint
----
voters=(2 3) learners=(4)
2: StateProbe match=0 next=10
3: StateProbe match=0 next=10
4: StateProbe match=0 next=10 learner
simple
l9
----
voters=(2 3) learners=(4 9)
2: StateProbe match=0 next=10
3: StateProbe match=0 next=10
4: StateProbe match=0 next=10 learner
9: StateProbe match=0 next=15 learner

View File

@ -0,0 +1,69 @@
simple
v1
----
voters=(1)
1: StateProbe match=0 next=1
simple
v1
----
voters=(1)
1: StateProbe match=0 next=1
simple
v2
----
voters=(1 2)
1: StateProbe match=0 next=1
2: StateProbe match=0 next=3
simple
l1
----
voters=(2) learners=(1)
1: StateProbe match=0 next=1 learner
2: StateProbe match=0 next=3
simple
l1
----
voters=(2) learners=(1)
1: StateProbe match=0 next=1 learner
2: StateProbe match=0 next=3
simple
r1
----
voters=(2)
2: StateProbe match=0 next=3
simple
r1
----
voters=(2)
2: StateProbe match=0 next=3
simple
v3
----
voters=(2 3)
2: StateProbe match=0 next=3
3: StateProbe match=0 next=8
simple
r3
----
voters=(2)
2: StateProbe match=0 next=3
simple
r3
----
voters=(2)
2: StateProbe match=0 next=3
simple
r4
----
voters=(2)
2: StateProbe match=0 next=3

View File

@ -0,0 +1,60 @@
# Set up three voters for this test.
simple
v1
----
voters=(1)
1: StateProbe match=0 next=1
simple
v2
----
voters=(1 2)
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2
simple
v3
----
voters=(1 2 3)
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2
3: StateProbe match=0 next=3
# Can atomically demote and promote without a hitch.
# This is pointless, but possible.
simple
l1 v1
----
voters=(1 2 3)
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2
3: StateProbe match=0 next=3
# Can demote a voter.
simple
l2
----
voters=(1 3) learners=(2)
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2 learner
3: StateProbe match=0 next=3
# Can atomically promote and demote the same voter.
# This is pointless, but possible.
simple
v2 l2
----
voters=(1 3) learners=(2)
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2 learner
3: StateProbe match=0 next=3
# Can promote a voter.
simple
v2
----
voters=(1 2 3)
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2
3: StateProbe match=0 next=3

View File

@ -0,0 +1,64 @@
simple
l1
----
removed all voters
simple
v1
----
voters=(1)
1: StateProbe match=0 next=2
simple
v2 l3
----
voters=(1 2) learners=(3)
1: StateProbe match=0 next=2
2: StateProbe match=0 next=3
3: StateProbe match=0 next=3 learner
simple
r1 v5
----
more than voter changed without entering joint config
simple
r1 r2
----
removed all voters
simple
v3 v4
----
more than voter changed without entering joint config
simple
l1 v5
----
more than voter changed without entering joint config
simple
l1 l2
----
removed all voters
simple
l2 l3 l4 l5
----
voters=(1) learners=(2 3 4 5)
1: StateProbe match=0 next=2
2: StateProbe match=0 next=3 learner
3: StateProbe match=0 next=3 learner
4: StateProbe match=0 next=9 learner
5: StateProbe match=0 next=9 learner
simple
r1
----
removed all voters
simple
r2 r3 r4 r5
----
voters=(1)
1: StateProbe match=0 next=2

23
raft/confchange/testdata/update.txt vendored Normal file
View File

@ -0,0 +1,23 @@
# Nobody cares about ConfChangeUpdateNode, but at least use it once. It is used
# by etcd as a convenient way to pass a blob through their conf change machinery
# that updates information tracked outside of raft.
simple
v1
----
voters=(1)
1: StateProbe match=0 next=1
simple
v2 u1
----
voters=(1 2)
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2
simple
u1 u2 u3 u1 u2 u3
----
voters=(1 2)
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2

6
raft/confchange/testdata/zero.txt vendored Normal file
View File

@ -0,0 +1,6 @@
# NodeID zero is ignored.
simple
v1 r0 v0 l0
----
voters=(1)
1: StateProbe match=0 next=1

View File

@ -24,6 +24,7 @@ import (
"sync"
"time"
"go.etcd.io/etcd/raft/confchange"
"go.etcd.io/etcd/raft/quorum"
pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
@ -356,15 +357,11 @@ func newRaft(c *Config) *raft {
}
for _, p := range peers {
// Add node to active config.
r.prs.InitProgress(p, 0 /* match */, 1 /* next */, false /* isLearner */)
r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: p})
}
for _, p := range learners {
// Add learner to active config.
r.prs.InitProgress(p, 0 /* match */, 1 /* next */, true /* isLearner */)
if r.id == p {
r.isLearner = true
}
r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddLearnerNode, NodeID: p})
}
if !isHardStateEqual(hs, emptyState) {
@ -1401,55 +1398,15 @@ func (r *raft) promotable() bool {
}
func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState {
addNodeOrLearnerNode := func(id uint64, isLearner bool) {
// NB: this method is intentionally hidden from view. All mutations of
// the conf state must call applyConfChange directly.
pr := r.prs.Progress[id]
if pr == nil {
r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
} else {
if isLearner && !pr.IsLearner {
// Can only change Learner to Voter.
//
// TODO(tbg): why?
r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
return
}
if isLearner == pr.IsLearner {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
}
// Change Learner to Voter, use origin Learner progress.
r.prs.RemoveAny(id)
r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
pr.IsLearner = false
*r.prs.Progress[id] = *pr
}
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has had a chance to communicate with us.
r.prs.Progress[id].RecentActive = true
}
var removed int
if cc.NodeID != None {
switch cc.Type {
case pb.ConfChangeAddNode:
addNodeOrLearnerNode(cc.NodeID, false /* isLearner */)
case pb.ConfChangeAddLearnerNode:
addNodeOrLearnerNode(cc.NodeID, true /* isLearner */)
case pb.ConfChangeRemoveNode:
removed++
r.prs.RemoveAny(cc.NodeID)
case pb.ConfChangeUpdateNode:
default:
panic("unexpected conf type")
}
cfg, prs, err := confchange.Changer{
Tracker: r.prs,
LastIndex: r.raftLog.lastIndex(),
}.Simple(cc)
if err != nil {
panic(err)
}
r.prs.Config = cfg
r.prs.Progress = prs
r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
// Now that the configuration is updated, handle any side effects.
@ -1479,12 +1436,10 @@ func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState {
if r.state != StateLeader || len(cs.Nodes) == 0 {
return cs
}
if removed > 0 {
if r.maybeCommit() {
// The quorum size may have been reduced (but not to zero), so see if
// any pending entries can be committed.
if r.maybeCommit() {
r.bcastAppend()
}
r.bcastAppend()
}
// If the the leadTransferee was removed, abort the leadership transfer.
if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 {

View File

@ -1140,9 +1140,13 @@ func TestCommit(t *testing.T) {
storage.hardState = pb.HardState{Term: tt.smTerm}
sm := newTestRaft(1, []uint64{1}, 10, 2, storage)
sm.prs.RemoveAny(1)
for j := 0; j < len(tt.matches); j++ {
sm.prs.InitProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false)
id := uint64(j) + 1
if id > 1 {
sm.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: id})
}
pr := sm.prs.Progress[id]
pr.Match, pr.Next = tt.matches[j], tt.matches[j]+1
}
sm.maybeCommit()
if g := sm.raftLog.committed; g != tt.w {
@ -1927,7 +1931,7 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) {
nt := newNetwork(a, b)
setRandomizedElectionTimeout(b, b.electionTimeout+1)
// Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states
b.prs.RemoveAny(2)
b.applyConfChange(pb.ConfChange{Type: pb.ConfChangeRemoveNode, NodeID: 2})
if b.promotable() {
t.Fatalf("promotable = %v, want false", b.promotable())
@ -3093,14 +3097,42 @@ func TestAddNode(t *testing.T) {
// TestAddLearner tests that addLearner could update nodes correctly.
func TestAddLearner(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
// Add new learner peer.
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode})
if r.isLearner {
t.Fatal("expected 1 to be voter")
}
nodes := r.prs.LearnerNodes()
wnodes := []uint64{2}
if !reflect.DeepEqual(nodes, wnodes) {
t.Errorf("nodes = %v, want %v", nodes, wnodes)
}
if !r.prs.Progress[2].IsLearner {
t.Errorf("node 2 is learner %t, want %t", r.prs.Progress[2].IsLearner, true)
t.Fatal("expected 2 to be learner")
}
// Promote peer to voter.
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode})
if r.prs.Progress[2].IsLearner {
t.Fatal("expected 2 to be voter")
}
// Demote r.
r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddLearnerNode})
if !r.prs.Progress[1].IsLearner {
t.Fatal("expected 1 to be learner")
}
if !r.isLearner {
t.Fatal("expected 1 to be learner")
}
// Promote r again.
r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddNode})
if r.prs.Progress[1].IsLearner {
t.Fatal("expected 1 to be voter")
}
if r.isLearner {
t.Fatal("expected 1 to be voter")
}
}
@ -3148,12 +3180,13 @@ func TestRemoveNode(t *testing.T) {
t.Errorf("nodes = %v, want %v", g, w)
}
// remove all nodes from cluster
// Removing the remaining voter will panic.
defer func() {
if r := recover(); r == nil {
t.Error("did not panic")
}
}()
r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode})
w = []uint64{}
if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
}
// TestRemoveLearner tests that removeNode could update nodes and
@ -3171,12 +3204,15 @@ func TestRemoveLearner(t *testing.T) {
t.Errorf("nodes = %v, want %v", g, w)
}
// remove all nodes from cluster
// Removing the remaining voter will panic.
defer func() {
if r := recover(); r == nil {
t.Error("did not panic")
}
}()
r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode})
if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
}
func TestPromotable(t *testing.T) {
id := uint64(1)
tests := []struct {
@ -4124,12 +4160,16 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
sm := newRaft(cfg)
npeers[id] = sm
case *raft:
// TODO(tbg): this is all pretty confused. Clean this up.
learners := make(map[uint64]bool, len(v.prs.Learners))
for i := range v.prs.Learners {
learners[i] = true
}
v.id = id
v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight)
if len(learners) > 0 {
v.prs.Learners = map[uint64]struct{}{}
}
for i := 0; i < size; i++ {
pr := &tracker.Progress{}
if _, ok := learners[peerAddrs[i]]; ok {

View File

@ -16,6 +16,7 @@ package tracker
import (
"fmt"
"sort"
"strings"
)
@ -235,3 +236,22 @@ func (pr *Progress) String() string {
}
return buf.String()
}
// ProgressMap is a map of *Progress.
type ProgressMap map[uint64]*Progress
// String prints the ProgressMap in sorted key order, one Progress per line.
func (m ProgressMap) String() string {
ids := make([]uint64, 0, len(m))
for k := range m {
ids = append(ids, k)
}
sort.Slice(ids, func(i, j int) bool {
return ids[i] < ids[j]
})
var buf strings.Builder
for _, id := range ids {
fmt.Fprintf(&buf, "%d: %s\n", id, m[id])
}
return buf.String()
}

View File

@ -17,6 +17,7 @@ package tracker
import (
"fmt"
"sort"
"strings"
"go.etcd.io/etcd/raft/quorum"
)
@ -33,12 +34,11 @@ type Config struct {
// simplifies the implementation since it allows peers to have clarity about
// its current role without taking into account joint consensus.
Learners map[uint64]struct{}
// TODO(tbg): when we actually carry out joint consensus changes and turn a
// voter into a learner, we cannot add the learner when entering the joint
// state. This is because this would violate the invariant that the inter-
// section of voters and learners is empty. For example, assume a Voter is
// removed and immediately re-added as a learner (or in other words, it is
// demoted).
// When we turn a voter into a learner during a joint consensus transition,
// we cannot add the learner directly when entering the joint state. This is
// because this would violate the invariant that the intersection of
// voters and learners is empty. For example, assume a Voter is removed and
// immediately re-added as a learner (or in other words, it is demoted):
//
// Initially, the configuration will be
//
@ -51,7 +51,7 @@ type Config struct {
// learners: {3}
//
// but this violates the invariant (3 is both voter and learner). Instead,
// we have
// we get
//
// voters: {1 2} & {1 2 3}
// learners: {}
@ -66,20 +66,40 @@ type Config struct {
//
// Note that next_learners is not used while adding a learner that is not
// also a voter in the joint config. In this case, the learner is added
// to Learners right away when entering the joint configuration, so that it
// is caught up as soon as possible.
//
// NextLearners map[uint64]struct{}
// right away when entering the joint configuration, so that it is caught up
// as soon as possible.
LearnersNext map[uint64]struct{}
}
func (c *Config) String() string {
if len(c.Learners) == 0 {
return fmt.Sprintf("voters=%s", c.Voters)
func (c Config) String() string {
var buf strings.Builder
fmt.Fprintf(&buf, "voters=%s", c.Voters)
if c.Learners != nil {
fmt.Fprintf(&buf, " learners=%s", quorum.MajorityConfig(c.Learners).String())
}
if c.LearnersNext != nil {
fmt.Fprintf(&buf, " learners_next=%s", quorum.MajorityConfig(c.LearnersNext).String())
}
return buf.String()
}
// Clone returns a copy of the Config that shares no memory with the original.
func (c *Config) Clone() Config {
clone := func(m map[uint64]struct{}) map[uint64]struct{} {
if m == nil {
return nil
}
mm := make(map[uint64]struct{}, len(m))
for k := range m {
mm[k] = struct{}{}
}
return mm
}
return Config{
Voters: quorum.JointConfig{clone(c.Voters[0]), clone(c.Voters[1])},
Learners: clone(c.Learners),
LearnersNext: clone(c.LearnersNext),
}
return fmt.Sprintf(
"voters=%s learners=%s",
c.Voters, quorum.MajorityConfig(c.Learners).String(),
)
}
// ProgressTracker tracks the currently active configuration and the information
@ -88,7 +108,7 @@ func (c *Config) String() string {
type ProgressTracker struct {
Config
Progress map[uint64]*Progress
Progress ProgressMap
Votes map[uint64]bool
@ -102,11 +122,10 @@ func MakeProgressTracker(maxInflight int) ProgressTracker {
Config: Config{
Voters: quorum.JointConfig{
quorum.MajorityConfig{},
// TODO(tbg): this will be mostly empty, so make it a nil pointer
// in the common case.
quorum.MajorityConfig{},
nil, // only populated when used
},
Learners: map[uint64]struct{}{},
Learners: nil, // only populated when used
LearnersNext: nil, // only populated when used
},
Votes: map[uint64]bool{},
Progress: map[uint64]*Progress{},
@ -139,44 +158,6 @@ func (p *ProgressTracker) Committed() uint64 {
return uint64(p.Voters.CommittedIndex(matchAckIndexer(p.Progress)))
}
// RemoveAny removes this peer, which *must* be tracked as a voter or learner,
// from the tracker.
func (p *ProgressTracker) RemoveAny(id uint64) {
_, okPR := p.Progress[id]
_, okV1 := p.Voters[0][id]
_, okV2 := p.Voters[1][id]
_, okL := p.Learners[id]
okV := okV1 || okV2
if !okPR {
panic("attempting to remove unknown peer %x")
} else if !okV && !okL {
panic("attempting to remove unknown peer %x")
} else if okV && okL {
panic(fmt.Sprintf("peer %x is both voter and learner", id))
}
delete(p.Voters[0], id)
delete(p.Voters[1], id)
delete(p.Learners, id)
delete(p.Progress, id)
}
// InitProgress initializes a new progress for the given node or learner. The
// node may not exist yet in either form or a panic will ensue.
func (p *ProgressTracker) InitProgress(id, match, next uint64, isLearner bool) {
if pr := p.Progress[id]; pr != nil {
panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
}
if !isLearner {
p.Voters[0][id] = struct{}{}
} else {
p.Learners[id] = struct{}{}
}
p.Progress[id] = &Progress{Next: next, Match: match, Inflights: NewInflights(p.MaxInflight), IsLearner: isLearner}
}
// Visit invokes the supplied closure for all tracked progresses.
func (p *ProgressTracker) Visit(f func(id uint64, pr *Progress)) {
for id, pr := range p.Progress {