mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: add DisableProposalForwarding option
this allows users to disable followers from forwarding proposals to the leader.
This commit is contained in:
parent
8b09309c81
commit
e461017ac5
@ -190,6 +190,38 @@ func TestNodeReadIndex(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestDisableProposalForwarding ensures that proposals are not forwarded to
|
||||
// the leader when DisableProposalForwarding is true.
|
||||
func TestDisableProposalForwarding(t *testing.T) {
|
||||
r1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
r2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
cfg3 := newTestConfig(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
cfg3.DisableProposalForwarding = true
|
||||
r3 := newRaft(cfg3)
|
||||
nt := newNetwork(r1, r2, r3)
|
||||
|
||||
// elect r1 as leader
|
||||
nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
|
||||
|
||||
var testEntries = []raftpb.Entry{{Data: []byte("testdata")}}
|
||||
|
||||
// send proposal to r2(follower) where DisableProposalForwarding is false
|
||||
r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgProp, Entries: testEntries})
|
||||
|
||||
// verify r2(follower) does forward the proposal when DisableProposalForwarding is false
|
||||
if len(r2.msgs) != 1 {
|
||||
t.Fatalf("len(r2.msgs) expected 1, got %d", len(r2.msgs))
|
||||
}
|
||||
|
||||
// send proposal to r3(follower) where DisableProposalForwarding is true
|
||||
r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgProp, Entries: testEntries})
|
||||
|
||||
// verify r3(follower) does not forward the proposal when DisableProposalForwarding is true
|
||||
if len(r3.msgs) != 0 {
|
||||
t.Fatalf("len(r3.msgs) expected 0, got %d", len(r3.msgs))
|
||||
}
|
||||
}
|
||||
|
||||
// TestNodeReadIndexToOldLeader ensures that raftpb.MsgReadIndex to old leader
|
||||
// gets forwarded to the new leader and 'send' method does not attach its term.
|
||||
func TestNodeReadIndexToOldLeader(t *testing.T) {
|
||||
|
39
raft/raft.go
39
raft/raft.go
@ -176,6 +176,16 @@ type Config struct {
|
||||
// Logger is the logger used for raft log. For multinode which can host
|
||||
// multiple raft group, each raft group can have its own logger
|
||||
Logger Logger
|
||||
|
||||
// DisableProposalForwarding set to true means that followers will drop
|
||||
// proposals, rather than forwarding them to the leader. One use case for
|
||||
// this feature would be in a situation where the Raft leader is used to
|
||||
// compute the data of a proposal, for example, adding a timestamp from a
|
||||
// hybrid logical clock to data in a monotonically increasing way. Forwarding
|
||||
// should be disabled to prevent a follower with an innaccurate hybrid
|
||||
// logical clock from assigning the timestamp and then forwarding the data
|
||||
// to the leader.
|
||||
DisableProposalForwarding bool
|
||||
}
|
||||
|
||||
func (c *Config) validate() error {
|
||||
@ -256,6 +266,7 @@ type raft struct {
|
||||
// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
|
||||
// when raft changes its state to follower or candidate.
|
||||
randomizedElectionTimeout int
|
||||
disableProposalForwarding bool
|
||||
|
||||
tick func()
|
||||
step stepFunc
|
||||
@ -283,18 +294,19 @@ func newRaft(c *Config) *raft {
|
||||
peers = cs.Nodes
|
||||
}
|
||||
r := &raft{
|
||||
id: c.ID,
|
||||
lead: None,
|
||||
raftLog: raftlog,
|
||||
maxMsgSize: c.MaxSizePerMsg,
|
||||
maxInflight: c.MaxInflightMsgs,
|
||||
prs: make(map[uint64]*Progress),
|
||||
electionTimeout: c.ElectionTick,
|
||||
heartbeatTimeout: c.HeartbeatTick,
|
||||
logger: c.Logger,
|
||||
checkQuorum: c.CheckQuorum,
|
||||
preVote: c.PreVote,
|
||||
readOnly: newReadOnly(c.ReadOnlyOption),
|
||||
id: c.ID,
|
||||
lead: None,
|
||||
raftLog: raftlog,
|
||||
maxMsgSize: c.MaxSizePerMsg,
|
||||
maxInflight: c.MaxInflightMsgs,
|
||||
prs: make(map[uint64]*Progress),
|
||||
electionTimeout: c.ElectionTick,
|
||||
heartbeatTimeout: c.HeartbeatTick,
|
||||
logger: c.Logger,
|
||||
checkQuorum: c.CheckQuorum,
|
||||
preVote: c.PreVote,
|
||||
readOnly: newReadOnly(c.ReadOnlyOption),
|
||||
disableProposalForwarding: c.DisableProposalForwarding,
|
||||
}
|
||||
for _, p := range peers {
|
||||
r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
|
||||
@ -1033,6 +1045,9 @@ func stepFollower(r *raft, m pb.Message) {
|
||||
if r.lead == None {
|
||||
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
|
||||
return
|
||||
} else if r.disableProposalForwarding {
|
||||
r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
|
||||
return
|
||||
}
|
||||
m.To = r.lead
|
||||
r.send(m)
|
||||
|
Loading…
x
Reference in New Issue
Block a user