raft: fix ReadState

This commit is contained in:
Blake Mizerany 2014-08-23 16:06:55 -07:00 committed by Yicheng Qin
parent 61a413c219
commit ea66d94273
4 changed files with 25 additions and 60 deletions

View File

@ -6,28 +6,29 @@ import (
"code.google.com/p/go.net/context"
)
func applyToStore(ents []Entry) {}
func sendMessages(msgs []Message) {}
func saveStateToDisk(st State) {}
func saveToDisk(ents []Entry) {}
func applyToStore(ents []Entry) {}
func sendMessages(msgs []Message) {}
func saveStateToDisk(st State) {}
func saveToDisk(ents []Entry) {}
func Example_Node() {
n := Start(context.Background(), "", 0, 0)
n := Start(context.Background(), 0)
// stuff to n happens in other gorotines
// stuff to n happens in other goroutines
// the last known state
var prev State
var prev *State
for {
// ReadState blocks until there is new state ready.
st, ents, cents, msgs, err := n.ReadState()
st, ents, cents, msgs, err := n.ReadState(context.Background())
if err != nil {
log.Fatal(err)
}
curr := &st
if !prev.Equal(st) {
saveStateToDisk(st)
prev = st
prev = curr
}
saveToDisk(ents)

View File

@ -4,13 +4,17 @@ package raft
import "code.google.com/p/go.net/context"
type stateResp struct {
state State
st State
ents, cents []Entry
msgs []Message
}
func (a State) Equal(b State) bool {
return a.Term == b.Term && a.Vote == b.Vote && a.LastIndex == b.LastIndex
}
func (sr stateResp) containsUpdates(prev stateResp) bool {
return !prev.state.Equal(sr.state) || len(sr.ents) > 0 || len(sr.cents) > 0 || len(sr.msgs) > 0
return !prev.st.Equal(sr.st) || len(sr.ents) > 0 || len(sr.cents) > 0 || len(sr.msgs) > 0
}
type Node struct {
@ -21,7 +25,7 @@ type Node struct {
tickc chan struct{}
}
func Start(ctx context.Context, name string, election, heartbeat int) *Node {
func Start(ctx context.Context, id int64) *Node {
n := &Node{
ctx: ctx,
propc: make(chan []byte),
@ -29,11 +33,7 @@ func Start(ctx context.Context, name string, election, heartbeat int) *Node {
statec: make(chan stateResp),
tickc: make(chan struct{}),
}
r := &raft{
name: name,
election: election,
heartbeat: heartbeat,
}
r := &raft{raftLog: newLog(), id: id}
go n.run(r)
return n
}
@ -93,10 +93,12 @@ func (n *Node) Tick() error {
}
// Propose proposes data be appended to the log.
func (n *Node) Propose(data []byte) error {
func (n *Node) Propose(ctx context.Context, data []byte) error {
select {
case n.propc <- data:
return nil
case <-ctx.Done():
return ctx.Err()
case <-n.ctx.Done():
return n.ctx.Err()
}
@ -113,10 +115,12 @@ func (n *Node) Step(m Message) error {
}
// ReadState returns the current point-in-time state.
func (n *Node) ReadState() (st State, ents, cents []Entry, msgs []Message, err error) {
func (n *Node) ReadState(ctx context.Context) (st State, ents, cents []Entry, msgs []Message, err error) {
select {
case sr := <-n.statec:
return sr.state, sr.ents, sr.cents, sr.msgs, nil
return sr.st, sr.ents, sr.cents, sr.msgs, nil
case <-ctx.Done():
return State{}, nil, nil, nil, ctx.Err()
case <-n.ctx.Done():
return State{}, nil, nil, nil, n.ctx.Err()
}

View File

@ -22,8 +22,6 @@ import math "math"
import io "io"
import code_google_com_p_gogoprotobuf_proto "code.google.com/p/gogoprotobuf/proto"
import bytes "bytes"
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
var _ = proto.Marshal
var _ = &json.SyntaxError{}
@ -230,40 +228,3 @@ func encodeVarintState(data []byte, offset int, v uint64) int {
data[offset] = uint8(v)
return offset + 1
}
func (this *State) Equal(that interface{}) bool {
if that == nil {
if this == nil {
return true
}
return false
}
that1, ok := that.(*State)
if !ok {
return false
}
if that1 == nil {
if this == nil {
return true
}
return false
} else if this == nil {
return false
}
if this.Term != that1.Term {
return false
}
if this.Vote != that1.Vote {
return false
}
if this.Commit != that1.Commit {
return false
}
if this.LastIndex != that1.LastIndex {
return false
}
if !bytes.Equal(this.XXX_unrecognized, that1.XXX_unrecognized) {
return false
}
return true
}

View File

@ -6,7 +6,6 @@ option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
option (gogoproto.equal_all) = true;
message State {
required int64 term = 1 [(gogoproto.nullable) = false];