mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: addr -> id
This commit is contained in:
parent
c24b6b4150
commit
abd2448931
@ -60,8 +60,8 @@ func TestBasicCluster(t *testing.T) {
|
||||
|
||||
for j := 0; j < tt.round; j++ {
|
||||
for _, n := range nodes {
|
||||
data := []byte{byte(n.addr)}
|
||||
nt.send(Message{Type: msgProp, To: n.addr, Entries: []Entry{{Data: data}}})
|
||||
data := []byte{byte(n.id)}
|
||||
nt.send(Message{Type: msgProp, To: n.id, Entries: []Entry{{Data: data}}})
|
||||
|
||||
base := nodes[0].Next()
|
||||
if len(base) != 1 {
|
||||
|
20
raft/node.go
20
raft/node.go
@ -27,10 +27,10 @@ type Node struct {
|
||||
elapsed tick
|
||||
sm *stateMachine
|
||||
|
||||
addr int
|
||||
id int
|
||||
}
|
||||
|
||||
func New(addr int, heartbeat, election tick) *Node {
|
||||
func New(id int, heartbeat, election tick) *Node {
|
||||
if election < heartbeat*3 {
|
||||
panic("election is least three times as heartbeat [election: %d, heartbeat: %d]")
|
||||
}
|
||||
@ -38,8 +38,8 @@ func New(addr int, heartbeat, election tick) *Node {
|
||||
n := &Node{
|
||||
heartbeat: heartbeat,
|
||||
election: election,
|
||||
addr: addr,
|
||||
sm: newStateMachine(addr, []int{addr}),
|
||||
id: id,
|
||||
sm: newStateMachine(id, []int{id}),
|
||||
}
|
||||
|
||||
return n
|
||||
@ -47,7 +47,7 @@ func New(addr int, heartbeat, election tick) *Node {
|
||||
|
||||
func Dictate(n *Node) *Node {
|
||||
n.Step(Message{Type: msgHup})
|
||||
n.Step(n.newConfMessage(configAdd, &Config{NodeId: n.addr}))
|
||||
n.Step(n.newConfMessage(configAdd, &Config{NodeId: n.id}))
|
||||
return n
|
||||
}
|
||||
|
||||
@ -57,12 +57,12 @@ func (n *Node) Propose(data []byte) {
|
||||
n.Step(m)
|
||||
}
|
||||
|
||||
func (n *Node) Add(addr int) {
|
||||
n.Step(n.newConfMessage(configAdd, &Config{NodeId: addr}))
|
||||
func (n *Node) Add(id int) {
|
||||
n.Step(n.newConfMessage(configAdd, &Config{NodeId: id}))
|
||||
}
|
||||
|
||||
func (n *Node) Remove(addr int) {
|
||||
n.Step(n.newConfMessage(configRemove, &Config{NodeId: addr}))
|
||||
func (n *Node) Remove(id int) {
|
||||
n.Step(n.newConfMessage(configRemove, &Config{NodeId: id}))
|
||||
}
|
||||
|
||||
func (n *Node) Msgs() []Message {
|
||||
@ -138,5 +138,5 @@ func (n *Node) newConfMessage(t int, c *Config) Message {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return Message{Type: msgProp, To: n.addr, Entries: []Entry{Entry{Type: t, Data: data}}}
|
||||
return Message{Type: msgProp, To: n.id, Entries: []Entry{Entry{Type: t, Data: data}}}
|
||||
}
|
||||
|
@ -95,8 +95,8 @@ func TestStartCluster(t *testing.T) {
|
||||
if len(n.sm.ins) != 1 {
|
||||
t.Errorf("k = %d, want 1", len(n.sm.ins))
|
||||
}
|
||||
if n.sm.addr != 0 {
|
||||
t.Errorf("addr = %d, want 0", n.sm.addr)
|
||||
if n.sm.id != 0 {
|
||||
t.Errorf("addr = %d, want 0", n.sm.id)
|
||||
}
|
||||
if n.sm.state != stateLeader {
|
||||
t.Errorf("state = %s, want %s", n.sm.state, stateLeader)
|
||||
@ -112,8 +112,8 @@ func TestAdd(t *testing.T) {
|
||||
if len(n.sm.ins) != 2 {
|
||||
t.Errorf("k = %d, want 2", len(n.sm.ins))
|
||||
}
|
||||
if n.sm.addr != 0 {
|
||||
t.Errorf("addr = %d, want 0", n.sm.addr)
|
||||
if n.sm.id != 0 {
|
||||
t.Errorf("addr = %d, want 0", n.sm.id)
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,7 +129,7 @@ func TestRemove(t *testing.T) {
|
||||
if len(n.sm.ins) != 1 {
|
||||
t.Errorf("k = %d, want 1", len(n.sm.ins))
|
||||
}
|
||||
if n.sm.addr != 0 {
|
||||
t.Errorf("addr = %d, want 0", n.sm.addr)
|
||||
if n.sm.id != 0 {
|
||||
t.Errorf("addr = %d, want 0", n.sm.id)
|
||||
}
|
||||
}
|
||||
|
40
raft/raft.go
40
raft/raft.go
@ -81,7 +81,7 @@ func (in *index) decr() {
|
||||
}
|
||||
|
||||
type stateMachine struct {
|
||||
addr int
|
||||
id int
|
||||
|
||||
// the term we are participating in at any time
|
||||
term int
|
||||
@ -100,15 +100,15 @@ type stateMachine struct {
|
||||
|
||||
msgs []Message
|
||||
|
||||
// the leader addr
|
||||
// the leader id
|
||||
lead int
|
||||
|
||||
// pending reconfiguration
|
||||
pendingConf bool
|
||||
}
|
||||
|
||||
func newStateMachine(addr int, peers []int) *stateMachine {
|
||||
sm := &stateMachine{addr: addr, log: newLog(), ins: make(map[int]*index)}
|
||||
func newStateMachine(id int, peers []int) *stateMachine {
|
||||
sm := &stateMachine{id: id, log: newLog(), ins: make(map[int]*index)}
|
||||
for p := range peers {
|
||||
sm.ins[p] = &index{}
|
||||
}
|
||||
@ -123,9 +123,9 @@ func (sm *stateMachine) canStep(m Message) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (sm *stateMachine) poll(addr int, v bool) (granted int) {
|
||||
if _, ok := sm.votes[addr]; !ok {
|
||||
sm.votes[addr] = v
|
||||
func (sm *stateMachine) poll(id int, v bool) (granted int) {
|
||||
if _, ok := sm.votes[id]; !ok {
|
||||
sm.votes[id] = v
|
||||
}
|
||||
for _, vv := range sm.votes {
|
||||
if vv {
|
||||
@ -137,7 +137,7 @@ func (sm *stateMachine) poll(addr int, v bool) (granted int) {
|
||||
|
||||
// send persists state to stable storage and then sends to its mailbox.
|
||||
func (sm *stateMachine) send(m Message) {
|
||||
m.From = sm.addr
|
||||
m.From = sm.id
|
||||
m.Term = sm.term
|
||||
sm.msgs = append(sm.msgs, m)
|
||||
}
|
||||
@ -158,7 +158,7 @@ func (sm *stateMachine) sendAppend(to int) {
|
||||
// bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
|
||||
func (sm *stateMachine) bcastAppend() {
|
||||
for i := range sm.ins {
|
||||
if i == sm.addr {
|
||||
if i == sm.id {
|
||||
continue
|
||||
}
|
||||
sm.sendAppend(i)
|
||||
@ -188,7 +188,7 @@ func (sm *stateMachine) reset() {
|
||||
sm.votes = make(map[int]bool)
|
||||
for i := range sm.ins {
|
||||
sm.ins[i] = &index{next: sm.log.lastIndex() + 1}
|
||||
if i == sm.addr {
|
||||
if i == sm.id {
|
||||
sm.ins[i].match = sm.log.lastIndex()
|
||||
}
|
||||
}
|
||||
@ -213,7 +213,7 @@ func (sm *stateMachine) becomeCandidate() {
|
||||
}
|
||||
sm.reset()
|
||||
sm.term++
|
||||
sm.vote = sm.addr
|
||||
sm.vote = sm.id
|
||||
sm.state = stateCandidate
|
||||
}
|
||||
|
||||
@ -223,7 +223,7 @@ func (sm *stateMachine) becomeLeader() {
|
||||
panic("invalid transition [follower -> leader]")
|
||||
}
|
||||
sm.reset()
|
||||
sm.lead = sm.addr
|
||||
sm.lead = sm.id
|
||||
sm.state = stateLeader
|
||||
|
||||
for _, e := range sm.log.ents[sm.log.committed:] {
|
||||
@ -244,12 +244,12 @@ func (sm *stateMachine) Step(m Message) {
|
||||
switch m.Type {
|
||||
case msgHup:
|
||||
sm.becomeCandidate()
|
||||
if sm.q() == sm.poll(sm.addr, true) {
|
||||
if sm.q() == sm.poll(sm.id, true) {
|
||||
sm.becomeLeader()
|
||||
return
|
||||
}
|
||||
for i := range sm.ins {
|
||||
if i == sm.addr {
|
||||
if i == sm.id {
|
||||
continue
|
||||
}
|
||||
lasti := sm.log.lastIndex()
|
||||
@ -268,7 +268,7 @@ func (sm *stateMachine) Step(m Message) {
|
||||
}
|
||||
|
||||
switch sm.lead {
|
||||
case sm.addr:
|
||||
case sm.id:
|
||||
e := m.Entries[0]
|
||||
if e.Type == configAdd || e.Type == configRemove {
|
||||
if sm.pendingConf {
|
||||
@ -280,7 +280,7 @@ func (sm *stateMachine) Step(m Message) {
|
||||
e.Term = sm.term
|
||||
|
||||
sm.log.append(sm.log.lastIndex(), e)
|
||||
sm.ins[sm.addr].update(sm.log.lastIndex())
|
||||
sm.ins[sm.id].update(sm.log.lastIndex())
|
||||
sm.maybeCommit()
|
||||
sm.bcastAppend()
|
||||
case none:
|
||||
@ -356,12 +356,12 @@ func (sm *stateMachine) Step(m Message) {
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *stateMachine) Add(addr int) {
|
||||
sm.ins[addr] = &index{next: sm.log.lastIndex() + 1}
|
||||
func (sm *stateMachine) Add(id int) {
|
||||
sm.ins[id] = &index{next: sm.log.lastIndex() + 1}
|
||||
sm.pendingConf = false
|
||||
}
|
||||
|
||||
func (sm *stateMachine) Remove(addr int) {
|
||||
delete(sm.ins, addr)
|
||||
func (sm *stateMachine) Remove(id int) {
|
||||
delete(sm.ins, id)
|
||||
sm.pendingConf = false
|
||||
}
|
||||
|
@ -143,7 +143,7 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDuelingCandidates(t *testing.T) {
|
||||
a := newStateMachine(0, nil) // k, addr are set later
|
||||
a := newStateMachine(0, nil) // k, id are set later
|
||||
c := newStateMachine(0, nil)
|
||||
|
||||
tt := newNetwork(a, nil, c)
|
||||
@ -638,20 +638,20 @@ type network struct {
|
||||
}
|
||||
|
||||
// newNetwork initializes a network from peers. A nil node will be replaced
|
||||
// with a new *stateMachine. A *stateMachine will get its k, addr.
|
||||
// with a new *stateMachine. A *stateMachine will get its k, id.
|
||||
func newNetwork(peers ...Interface) *network {
|
||||
peerAddrs := make([]int, len(peers))
|
||||
for i := range peers {
|
||||
peerAddrs[i] = i
|
||||
}
|
||||
|
||||
for addr, p := range peers {
|
||||
for id, p := range peers {
|
||||
switch v := p.(type) {
|
||||
case nil:
|
||||
sm := newStateMachine(addr, peerAddrs)
|
||||
peers[addr] = sm
|
||||
sm := newStateMachine(id, peerAddrs)
|
||||
peers[id] = sm
|
||||
case *stateMachine:
|
||||
v.addr = addr
|
||||
v.id = id
|
||||
v.ins = make(map[int]*index)
|
||||
for i := range peerAddrs {
|
||||
v.ins[i] = &index{}
|
||||
@ -680,11 +680,11 @@ func (nw *network) cut(one, other int) {
|
||||
nw.drop(other, one, 1)
|
||||
}
|
||||
|
||||
func (nw *network) isolate(addr int) {
|
||||
func (nw *network) isolate(id int) {
|
||||
for i := 0; i < len(nw.peers); i++ {
|
||||
if i != addr {
|
||||
nw.drop(addr, i, 1.0)
|
||||
nw.drop(i, addr, 1.0)
|
||||
if i != id {
|
||||
nw.drop(id, i, 1.0)
|
||||
nw.drop(i, id, 1.0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user