Merge pull request #2004 from xiang90/status

raft: add Status interface
This commit is contained in:
Xiang Li 2015-01-20 10:51:33 -08:00
commit 7c7d78a11f
5 changed files with 146 additions and 83 deletions

View File

@ -1300,15 +1300,18 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
n.Record(testutil.Action{Name: "Step"})
return nil
}
func (n *nodeRecorder) Status() raft.Status { return raft.Status{} }
func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
func (n *nodeRecorder) Advance() {}
func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
return &raftpb.ConfState{}
}
func (n *nodeRecorder) Stop() {
n.Record(testutil.Action{Name: "Stop"})
}
func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) {
n.Record(testutil.Action{Name: "Compact"})
}

View File

@ -119,6 +119,8 @@ type Node interface {
// in snapshots. Will never return nil; it returns a pointer only
// to match MemoryStorage.Compact.
ApplyConfChange(cc pb.ConfChange) *pb.ConfState
// Status returns the current status of the raft state machine.
Status() Status
// Stop performs any necessary termination of the Node
Stop()
}
@ -190,6 +192,7 @@ type node struct {
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
}
func newNode() node {
@ -203,6 +206,7 @@ func newNode() node {
tickc: make(chan struct{}),
done: make(chan struct{}),
stop: make(chan struct{}),
status: make(chan chan Status),
}
}
@ -222,8 +226,7 @@ func (n *node) run(r *raft) {
var propc chan pb.Message
var readyc chan Ready
var advancec chan struct{}
var prevLastUnstablei uint64
var prevLastUnstablet uint64
var prevLastUnstablei, prevLastUnstablet uint64
var havePrevLastUnstablei bool
var prevSnapi uint64
var rd Ready
@ -328,6 +331,8 @@ func (n *node) run(r *raft) {
}
r.raftLog.stableSnapTo(prevSnapi)
advancec = nil
case c := <-n.status:
c <- getStatus(r)
case <-n.stop:
close(n.done)
return
@ -407,6 +412,12 @@ func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
return &cs
}
func (n *node) Status() Status {
c := make(chan Status)
n.status <- c
return <-c
}
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEntries(),

View File

@ -52,61 +52,61 @@ func (st StateType) String() string {
return stmap[uint64(st)]
}
type progress struct {
match, next uint64
wait int
type Progress struct {
Match, Next uint64
Wait int
}
func (pr *progress) update(n uint64) {
func (pr *Progress) update(n uint64) {
pr.waitReset()
if pr.match < n {
pr.match = n
if pr.Match < n {
pr.Match = n
}
if pr.next < n+1 {
pr.next = n + 1
if pr.Next < n+1 {
pr.Next = n + 1
}
}
func (pr *progress) optimisticUpdate(n uint64) { pr.next = n + 1 }
func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }
// maybeDecrTo returns false if the given to index comes from an out of order message.
// Otherwise it decreases the progress next index to min(rejected, last) and returns true.
func (pr *progress) maybeDecrTo(rejected, last uint64) bool {
func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
pr.waitReset()
if pr.match != 0 {
if pr.Match != 0 {
// the rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= pr.match {
if rejected <= pr.Match {
return false
}
// directly decrease next to match + 1
pr.next = pr.match + 1
pr.Next = pr.Match + 1
return true
}
// the rejection must be stale if "rejected" does not match next - 1
if pr.next-1 != rejected {
if pr.Next-1 != rejected {
return false
}
if pr.next = min(rejected, last+1); pr.next < 1 {
pr.next = 1
if pr.Next = min(rejected, last+1); pr.Next < 1 {
pr.Next = 1
}
return true
}
func (pr *progress) waitDecr(i int) {
pr.wait -= i
if pr.wait < 0 {
pr.wait = 0
func (pr *Progress) waitDecr(i int) {
pr.Wait -= i
if pr.Wait < 0 {
pr.Wait = 0
}
}
func (pr *progress) waitSet(w int) { pr.wait = w }
func (pr *progress) waitReset() { pr.wait = 0 }
func (pr *progress) shouldWait() bool { return pr.match == 0 && pr.wait > 0 }
func (pr *Progress) waitSet(w int) { pr.Wait = w }
func (pr *Progress) waitReset() { pr.Wait = 0 }
func (pr *Progress) shouldWait() bool { return pr.Match == 0 && pr.Wait > 0 }
func (pr *progress) String() string {
return fmt.Sprintf("next = %d, match = %d, wait = %v", pr.next, pr.match, pr.wait)
func (pr *Progress) String() string {
return fmt.Sprintf("next = %d, match = %d, wait = %v", pr.Next, pr.Match, pr.Wait)
}
type raft struct {
@ -117,7 +117,7 @@ type raft struct {
// the log
raftLog *raftLog
prs map[uint64]*progress
prs map[uint64]*Progress
state StateType
@ -161,13 +161,13 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage
id: id,
lead: None,
raftLog: raftlog,
prs: make(map[uint64]*progress),
prs: make(map[uint64]*Progress),
electionTimeout: election,
heartbeatTimeout: heartbeat,
}
r.rand = rand.New(rand.NewSource(int64(id)))
for _, p := range peers {
r.prs[p] = &progress{next: 1}
r.prs[p] = &Progress{Next: 1}
}
if !isHardStateEqual(hs, emptyState) {
r.loadState(hs)
@ -220,7 +220,7 @@ func (r *raft) sendAppend(to uint64) {
}
m := pb.Message{}
m.To = to
if r.needSnapshot(pr.next) {
if r.needSnapshot(pr.Next) {
m.Type = pb.MsgSnap
snapshot, err := r.raftLog.snapshot()
if err != nil {
@ -236,15 +236,15 @@ func (r *raft) sendAppend(to uint64) {
pr.waitSet(r.electionTimeout)
} else {
m.Type = pb.MsgApp
m.Index = pr.next - 1
m.LogTerm = r.raftLog.term(pr.next - 1)
m.Entries = r.raftLog.entries(pr.next)
m.Index = pr.Next - 1
m.LogTerm = r.raftLog.term(pr.Next - 1)
m.Entries = r.raftLog.entries(pr.Next)
m.Commit = r.raftLog.committed
// optimistically increase the next if the follower
// has been matched.
if n := len(m.Entries); pr.match != 0 && n != 0 {
if n := len(m.Entries); pr.Match != 0 && n != 0 {
pr.optimisticUpdate(m.Entries[n-1].Index)
} else if pr.match == 0 {
} else if pr.Match == 0 {
// TODO (xiangli): better way to find out if the follower is in good path or not
// a follower might be in bad path even if match != 0, since we optimistically
// increase the next.
@ -262,7 +262,7 @@ func (r *raft) sendHeartbeat(to uint64) {
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.prs[to].match, r.raftLog.committed)
commit := min(r.prs[to].Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
@ -297,7 +297,7 @@ func (r *raft) maybeCommit() bool {
// TODO(bmizerany): optimize.. Currently naive
mis := make(uint64Slice, 0, len(r.prs))
for i := range r.prs {
mis = append(mis, r.prs[i].match)
mis = append(mis, r.prs[i].Match)
}
sort.Sort(sort.Reverse(mis))
mci := mis[r.q()-1]
@ -311,9 +311,9 @@ func (r *raft) reset(term uint64) {
r.elapsed = 0
r.votes = make(map[uint64]bool)
for i := range r.prs {
r.prs[i] = &progress{next: r.raftLog.lastIndex() + 1}
r.prs[i] = &Progress{Next: r.raftLog.lastIndex() + 1}
if i == r.id {
r.prs[i].match = r.raftLog.lastIndex()
r.prs[i].Match = r.raftLog.lastIndex()
}
}
r.pendingConf = false
@ -495,7 +495,7 @@ func stepLeader(r *raft, m pb.Message) {
}
}
case pb.MsgHeartbeatResp:
if r.prs[m.From].match < r.raftLog.lastIndex() {
if r.prs[m.From].Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}
case pb.MsgVote:
@ -616,7 +616,7 @@ func (r *raft) restore(s pb.Snapshot) bool {
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
r.raftLog.restore(s)
r.prs = make(map[uint64]*progress)
r.prs = make(map[uint64]*Progress)
for _, n := range s.Metadata.ConfState.Nodes {
match, next := uint64(0), uint64(r.raftLog.lastIndex())+1
if n == r.id {
@ -660,7 +660,7 @@ func (r *raft) removeNode(id uint64) {
func (r *raft) resetPendingConf() { r.pendingConf = false }
func (r *raft) setProgress(id, match, next uint64) {
r.prs[id] = &progress{next: next, match: match}
r.prs[id] = &Progress{Next: next, Match: match}
}
func (r *raft) delProgress(id uint64) {

View File

@ -64,16 +64,16 @@ func TestProgressUpdate(t *testing.T) {
{prevM + 2, prevM + 2, prevN + 1}, // increase match, next
}
for i, tt := range tests {
p := &progress{
match: prevM,
next: prevN,
p := &Progress{
Match: prevM,
Next: prevN,
}
p.update(tt.update)
if p.match != tt.wm {
t.Errorf("#%d: match= %d, want %d", i, p.match, tt.wm)
if p.Match != tt.wm {
t.Errorf("#%d: match= %d, want %d", i, p.Match, tt.wm)
}
if p.next != tt.wn {
t.Errorf("#%d: next= %d, want %d", i, p.next, tt.wn)
if p.Next != tt.wn {
t.Errorf("#%d: next= %d, want %d", i, p.Next, tt.wn)
}
}
}
@ -136,17 +136,17 @@ func TestProgressMaybeDecr(t *testing.T) {
},
}
for i, tt := range tests {
p := &progress{
match: tt.m,
next: tt.n,
p := &Progress{
Match: tt.m,
Next: tt.n,
}
if g := p.maybeDecrTo(tt.rejected, tt.last); g != tt.w {
t.Errorf("#%d: maybeDecrTo= %t, want %t", i, g, tt.w)
}
if gm := p.match; gm != tt.m {
if gm := p.Match; gm != tt.m {
t.Errorf("#%d: match= %d, want %d", i, gm, tt.m)
}
if gn := p.next; gn != tt.wn {
if gn := p.Next; gn != tt.wn {
t.Errorf("#%d: next= %d, want %d", i, gn, tt.wn)
}
}
@ -166,9 +166,9 @@ func TestProgressShouldWait(t *testing.T) {
{0, 0, false},
}
for i, tt := range tests {
p := &progress{
match: tt.m,
wait: tt.wait,
p := &Progress{
Match: tt.m,
Wait: tt.wait,
}
if g := p.shouldWait(); g != tt.w {
t.Errorf("#%d: shouldwait = %t, want %t", i, g, tt.w)
@ -179,17 +179,17 @@ func TestProgressShouldWait(t *testing.T) {
// TestProgressWaitReset ensures that progress.Update and progress.DercTo
// will reset progress.wait.
func TestProgressWaitReset(t *testing.T) {
p := &progress{
wait: 1,
p := &Progress{
Wait: 1,
}
p.maybeDecrTo(1, 1)
if p.wait != 0 {
t.Errorf("wait= %d, want 0", p.wait)
if p.Wait != 0 {
t.Errorf("wait= %d, want 0", p.Wait)
}
p.wait = 1
p.Wait = 1
p.update(2)
if p.wait != 0 {
t.Errorf("wait= %d, want 0", p.wait)
if p.Wait != 0 {
t.Errorf("wait= %d, want 0", p.Wait)
}
}
@ -198,11 +198,11 @@ func TestProgressDecr(t *testing.T) {
r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
r.becomeCandidate()
r.becomeLeader()
r.prs[2].wait = r.heartbeatTimeout * 2
r.prs[2].Wait = r.heartbeatTimeout * 2
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
if r.prs[2].wait != r.heartbeatTimeout*(2-1) {
t.Errorf("wait = %d, want %d", r.prs[2].wait, r.heartbeatTimeout*(2-1))
if r.prs[2].Wait != r.heartbeatTimeout*(2-1) {
t.Errorf("wait = %d, want %d", r.prs[2].Wait, r.heartbeatTimeout*(2-1))
}
}
@ -1073,11 +1073,11 @@ func TestLeaderAppResp(t *testing.T) {
sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index})
p := sm.prs[2]
if p.match != tt.wmatch {
t.Errorf("#%d match = %d, want %d", i, p.match, tt.wmatch)
if p.Match != tt.wmatch {
t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch)
}
if p.next != tt.wnext {
t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext)
if p.Next != tt.wnext {
t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
}
msgs := sm.readMessages()
@ -1119,9 +1119,9 @@ func TestBcastBeat(t *testing.T) {
sm.appendEntry(pb.Entry{Index: uint64(i) + 1})
}
// slow follower
sm.prs[2].match, sm.prs[2].next = 5, 6
sm.prs[2].Match, sm.prs[2].Next = 5, 6
// normal follower
sm.prs[3].match, sm.prs[3].next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
sm.prs[3].Match, sm.prs[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
sm.Step(pb.Message{Type: pb.MsgBeat})
msgs := sm.readMessages()
@ -1129,8 +1129,8 @@ func TestBcastBeat(t *testing.T) {
t.Fatalf("len(msgs) = %v, want 2", len(msgs))
}
wantCommitMap := map[uint64]uint64{
2: min(sm.raftLog.committed, sm.prs[2].match),
3: min(sm.raftLog.committed, sm.prs[3].match),
2: min(sm.raftLog.committed, sm.prs[2].Match),
3: min(sm.raftLog.committed, sm.prs[3].Match),
}
for i, m := range msgs {
if m.Type != pb.MsgHeartbeat {
@ -1216,12 +1216,12 @@ func TestLeaderIncreaseNext(t *testing.T) {
sm.raftLog.append(previousEnts...)
sm.becomeCandidate()
sm.becomeLeader()
sm.prs[2].match, sm.prs[2].next = tt.match, tt.next
sm.prs[2].Match, sm.prs[2].Next = tt.match, tt.next
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
p := sm.prs[2]
if p.next != tt.wnext {
t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext)
if p.Next != tt.wnext {
t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
}
}
}
@ -1310,9 +1310,9 @@ func TestProvideSnap(t *testing.T) {
// force set the next of node 1, so that
// node 1 needs a snapshot
sm.prs[2].next = sm.raftLog.firstIndex()
sm.prs[2].Next = sm.raftLog.firstIndex()
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].next - 1, Reject: true})
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].Next - 1, Reject: true})
msgs := sm.readMessages()
if len(msgs) != 1 {
t.Fatalf("len(msgs) = %d, want 1", len(msgs))
@ -1547,9 +1547,9 @@ func newNetwork(peers ...Interface) *network {
npeers[id] = sm
case *raft:
v.id = id
v.prs = make(map[uint64]*progress)
v.prs = make(map[uint64]*Progress)
for i := 0; i < size; i++ {
v.prs[peerAddrs[i]] = &progress{}
v.prs[peerAddrs[i]] = &Progress{}
}
v.reset(0)
npeers[id] = v

49
raft/status.go Normal file
View File

@ -0,0 +1,49 @@
/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package raft
import (
pb "github.com/coreos/etcd/raft/raftpb"
)
type Status struct {
ID uint64
pb.HardState
SoftState
Applied uint64
Progress map[uint64]Progress
}
// getStatus gets a copy of the current raft status.
func getStatus(r *raft) Status {
s := Status{ID: r.id}
s.HardState = r.HardState
s.SoftState = *r.softState()
s.Applied = r.raftLog.applied
if s.RaftState == StateLeader {
s.Progress = make(map[uint64]Progress)
for id, p := range r.prs {
s.Progress[id] = *p
}
}
return s
}