mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: move raft2 to raft
This commit is contained in:
committed by
Yicheng Qin
parent
15bb84d320
commit
134a962222
@@ -1,190 +0,0 @@
|
||||
package raft
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestBuildCluster ensures cluster with various size could be built.
|
||||
func TestBuildCluster(t *testing.T) {
|
||||
tests := []struct {
|
||||
size int
|
||||
ids []int64
|
||||
}{
|
||||
{1, nil},
|
||||
{3, nil},
|
||||
{5, nil},
|
||||
{7, nil},
|
||||
{9, nil},
|
||||
{13, nil},
|
||||
{51, nil},
|
||||
{1, []int64{1}},
|
||||
{3, []int64{1, 3, 5}},
|
||||
{5, []int64{1, 4, 7, 10, 13}},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
_, nodes := buildCluster(tt.size, tt.ids)
|
||||
|
||||
base := ltoa(nodes[0].sm.raftLog)
|
||||
for j, n := range nodes {
|
||||
// ensure same log
|
||||
l := ltoa(n.sm.raftLog)
|
||||
if g := diffu(base, l); g != "" {
|
||||
t.Errorf("#%d.%d: log diff:\n%s", i, j, g)
|
||||
}
|
||||
|
||||
// ensure same leader
|
||||
var w int64
|
||||
if tt.ids != nil {
|
||||
w = tt.ids[0]
|
||||
}
|
||||
if g := n.sm.lead.Get(); g != w {
|
||||
t.Errorf("#%d.%d: lead = %d, want %d", i, j, g, w)
|
||||
}
|
||||
|
||||
// ensure same peer map
|
||||
p := map[int64]struct{}{}
|
||||
for k := range n.sm.ins {
|
||||
p[k] = struct{}{}
|
||||
}
|
||||
wp := map[int64]struct{}{}
|
||||
for k := 0; k < tt.size; k++ {
|
||||
if tt.ids != nil {
|
||||
wp[tt.ids[k]] = struct{}{}
|
||||
} else {
|
||||
wp[int64(k)] = struct{}{}
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(p, wp) {
|
||||
t.Errorf("#%d.%d: peers = %+v, want %+v", i, j, p, wp)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitCluster(t *testing.T) {
|
||||
node := New(1, defaultHeartbeat, defaultElection)
|
||||
dictate(node)
|
||||
node.Next()
|
||||
|
||||
if node.ClusterId() != 0xBEEF {
|
||||
t.Errorf("clusterId = %x, want %x", node.ClusterId(), 0xBEEF)
|
||||
}
|
||||
|
||||
func() {
|
||||
defer func() {
|
||||
e := recover()
|
||||
if e != "cannot init a started cluster" {
|
||||
t.Errorf("err = %v, want cannot init a started cluster", e)
|
||||
}
|
||||
}()
|
||||
node.InitCluster(0xFBEE)
|
||||
node.Next()
|
||||
}()
|
||||
}
|
||||
|
||||
func TestMessageFromDifferentCluster(t *testing.T) {
|
||||
tests := []struct {
|
||||
clusterId int64
|
||||
wType messageType
|
||||
}{
|
||||
{0xBEEF, msgVoteResp},
|
||||
{0xFBEE, msgDenied},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
node := New(1, defaultHeartbeat, defaultElection)
|
||||
dictate(node)
|
||||
node.Next()
|
||||
|
||||
node.Step(Message{From: 1, ClusterId: tt.clusterId, Type: msgVote, Term: 2, LogTerm: 2, Index: 2})
|
||||
msgs := node.Msgs()
|
||||
if len(msgs) != 1 {
|
||||
t.Errorf("#%d: len(msgs) = %d, want 1", i, len(msgs))
|
||||
}
|
||||
if msgs[0].Type != tt.wType {
|
||||
t.Errorf("#%d: msg.Type = %v, want %d", i, msgs[0].Type, tt.wType)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestBasicCluster ensures all nodes can send proposal to the cluster.
|
||||
// And all the proposals will get committed.
|
||||
func TestBasicCluster(t *testing.T) {
|
||||
tests := []struct {
|
||||
size int
|
||||
round int
|
||||
}{
|
||||
{1, 3},
|
||||
{3, 3},
|
||||
{5, 3},
|
||||
{7, 3},
|
||||
{13, 1},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
nt, nodes := buildCluster(tt.size, nil)
|
||||
|
||||
for j := 0; j < tt.round; j++ {
|
||||
for _, n := range nodes {
|
||||
data := []byte{byte(n.Id())}
|
||||
nt.send(Message{From: n.Id(), To: n.Id(), ClusterId: n.ClusterId(), Type: msgProp, Entries: []Entry{{Data: data}}})
|
||||
|
||||
base := nodes[0].Next()
|
||||
if len(base) != 1 {
|
||||
t.Fatalf("#%d: len(ents) = %d, want 1", i, len(base))
|
||||
}
|
||||
if !reflect.DeepEqual(base[0].Data, data) {
|
||||
t.Errorf("#%d: data = %s, want %s", i, base[0].Data, data)
|
||||
}
|
||||
for k := 1; k < tt.size; k++ {
|
||||
g := nodes[k].Next()
|
||||
if !reflect.DeepEqual(g, base) {
|
||||
t.Errorf("#%d.%d: ent = %v, want %v", i, k, g, base)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This function is full of heck now. It will go away when we finish our
|
||||
// network Interface, and ticker infrastructure.
|
||||
func buildCluster(size int, ids []int64) (nt *network, nodes []*Node) {
|
||||
if ids == nil {
|
||||
ids = make([]int64, size)
|
||||
for i := 0; i < size; i++ {
|
||||
ids[i] = int64(i)
|
||||
}
|
||||
}
|
||||
|
||||
nodes = make([]*Node, size)
|
||||
nis := make([]Interface, size)
|
||||
for i := range nodes {
|
||||
nodes[i] = New(ids[i], defaultHeartbeat, defaultElection)
|
||||
nis[i] = nodes[i]
|
||||
}
|
||||
nt = newNetwork(nis...)
|
||||
|
||||
lead := dictate(nodes[0])
|
||||
lead.Next()
|
||||
for i := 1; i < size; i++ {
|
||||
lead.Add(ids[i], "", nil)
|
||||
nt.send(lead.Msgs()...)
|
||||
for j := 0; j < i; j++ {
|
||||
nodes[j].Next()
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < 10*defaultHeartbeat; i++ {
|
||||
nodes[0].Tick()
|
||||
}
|
||||
msgs := nodes[0].Msgs()
|
||||
nt.send(msgs...)
|
||||
|
||||
for _, n := range nodes {
|
||||
n.Next()
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
package raft;
|
||||
|
||||
import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto";
|
||||
|
||||
option (gogoproto.marshaler_all) = true;
|
||||
option (gogoproto.sizer_all) = true;
|
||||
option (gogoproto.unmarshaler_all) = true;
|
||||
option (gogoproto.goproto_getters_all) = false;
|
||||
|
||||
message Info {
|
||||
required int64 id = 1 [(gogoproto.nullable) = false];
|
||||
}
|
||||
328
raft/node.go
328
raft/node.go
@@ -1,273 +1,127 @@
|
||||
// Package raft implements raft.
|
||||
package raft
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
import "code.google.com/p/go.net/context"
|
||||
|
||||
type Interface interface {
|
||||
Step(m Message) bool
|
||||
Msgs() []Message
|
||||
type stateResp struct {
|
||||
st State
|
||||
ents, cents []Entry
|
||||
msgs []Message
|
||||
}
|
||||
|
||||
type tick int64
|
||||
func (a State) Equal(b State) bool {
|
||||
return a.Term == b.Term && a.Vote == b.Vote && a.LastIndex == b.LastIndex
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
NodeId int64
|
||||
Addr string
|
||||
Context []byte
|
||||
func (sr stateResp) containsUpdates(prev stateResp) bool {
|
||||
return !prev.st.Equal(sr.st) || len(sr.ents) > 0 || len(sr.cents) > 0 || len(sr.msgs) > 0
|
||||
}
|
||||
|
||||
type Node struct {
|
||||
sm *stateMachine
|
||||
|
||||
elapsed tick
|
||||
electionRand tick
|
||||
election tick
|
||||
heartbeat tick
|
||||
|
||||
// TODO: it needs garbage collection later
|
||||
rmNodes map[int64]struct{}
|
||||
removed bool
|
||||
ctx context.Context
|
||||
propc chan []byte
|
||||
recvc chan Message
|
||||
statec chan stateResp
|
||||
tickc chan struct{}
|
||||
}
|
||||
|
||||
func New(id int64, heartbeat, election tick) *Node {
|
||||
if election < heartbeat*3 {
|
||||
panic("election is least three times as heartbeat [election: %d, heartbeat: %d]")
|
||||
}
|
||||
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
func Start(ctx context.Context, id int64, peers []int64) *Node {
|
||||
n := &Node{
|
||||
heartbeat: heartbeat,
|
||||
election: election,
|
||||
electionRand: election + tick(rand.Int31())%election,
|
||||
sm: newStateMachine(id, []int64{id}),
|
||||
rmNodes: make(map[int64]struct{}),
|
||||
ctx: ctx,
|
||||
propc: make(chan []byte),
|
||||
recvc: make(chan Message),
|
||||
statec: make(chan stateResp),
|
||||
tickc: make(chan struct{}),
|
||||
}
|
||||
|
||||
r := newRaft(id, peers)
|
||||
go n.run(r)
|
||||
return n
|
||||
}
|
||||
|
||||
func Recover(id int64, s *Snapshot, ents []Entry, state State, heartbeat, election tick) *Node {
|
||||
n := New(id, heartbeat, election)
|
||||
if s != nil {
|
||||
n.sm.restore(*s)
|
||||
}
|
||||
n.sm.loadEnts(ents)
|
||||
if !state.IsEmpty() {
|
||||
n.sm.loadState(state)
|
||||
}
|
||||
return n
|
||||
}
|
||||
func (n *Node) run(r *raft) {
|
||||
propc := n.propc
|
||||
statec := n.statec
|
||||
|
||||
func (n *Node) Id() int64 { return n.sm.id }
|
||||
|
||||
func (n *Node) ClusterId() int64 { return n.sm.clusterId }
|
||||
|
||||
func (n *Node) Info() Info {
|
||||
return Info{Id: n.Id()}
|
||||
}
|
||||
|
||||
func (n *Node) Index() int64 { return n.sm.index.Get() }
|
||||
|
||||
func (n *Node) Term() int64 { return n.sm.term.Get() }
|
||||
|
||||
func (n *Node) Applied() int64 { return n.sm.raftLog.applied }
|
||||
|
||||
func (n *Node) HasLeader() bool { return n.Leader() != none }
|
||||
|
||||
func (n *Node) IsLeader() bool { return n.Leader() == n.Id() }
|
||||
|
||||
func (n *Node) Leader() int64 { return n.sm.lead.Get() }
|
||||
|
||||
func (n *Node) IsRemoved() bool { return n.removed }
|
||||
|
||||
func (n *Node) Nodes() []int64 {
|
||||
nodes := make(int64Slice, 0, len(n.sm.ins))
|
||||
for k := range n.sm.ins {
|
||||
nodes = append(nodes, k)
|
||||
}
|
||||
sort.Sort(nodes)
|
||||
return nodes
|
||||
}
|
||||
|
||||
// Propose asynchronously proposes data be applied to the underlying state machine.
|
||||
func (n *Node) Propose(data []byte) { n.propose(Normal, data) }
|
||||
|
||||
func (n *Node) propose(t int64, data []byte) {
|
||||
n.Step(Message{From: n.sm.id, ClusterId: n.ClusterId(), Type: msgProp, Entries: []Entry{{Type: t, Data: data}}})
|
||||
}
|
||||
|
||||
func (n *Node) Campaign() { n.Step(Message{From: n.sm.id, ClusterId: n.ClusterId(), Type: msgHup}) }
|
||||
|
||||
func (n *Node) InitCluster(clusterId int64) {
|
||||
d := make([]byte, 10)
|
||||
wn := binary.PutVarint(d, clusterId)
|
||||
n.propose(ClusterInit, d[:wn])
|
||||
}
|
||||
|
||||
func (n *Node) Add(id int64, addr string, context []byte) {
|
||||
n.UpdateConf(AddNode, &Config{NodeId: id, Addr: addr, Context: context})
|
||||
}
|
||||
|
||||
func (n *Node) Remove(id int64) {
|
||||
n.UpdateConf(RemoveNode, &Config{NodeId: id})
|
||||
}
|
||||
|
||||
func (n *Node) Msgs() []Message { return n.sm.Msgs() }
|
||||
|
||||
func (n *Node) Step(m Message) bool {
|
||||
if m.Type == msgDenied {
|
||||
n.removed = true
|
||||
return false
|
||||
}
|
||||
if n.ClusterId() != none && m.ClusterId != none && m.ClusterId != n.ClusterId() {
|
||||
log.Printf("deny message from=%d cluster=%d", m.From, m.ClusterId)
|
||||
n.sm.send(Message{To: m.From, ClusterId: n.ClusterId(), Type: msgDenied})
|
||||
return true
|
||||
}
|
||||
|
||||
if _, ok := n.rmNodes[m.From]; ok {
|
||||
if m.From != n.sm.id {
|
||||
n.sm.send(Message{To: m.From, ClusterId: n.ClusterId(), Type: msgDenied})
|
||||
var prev stateResp
|
||||
for {
|
||||
if r.hasLeader() {
|
||||
propc = n.propc
|
||||
} else {
|
||||
// We cannot accept proposals because we don't know who
|
||||
// to send them to, so we'll apply back-pressure and
|
||||
// block senders.
|
||||
propc = nil
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
l := len(n.sm.msgs)
|
||||
sr := stateResp{
|
||||
r.State,
|
||||
r.raftLog.unstableEnts(),
|
||||
r.raftLog.nextEnts(),
|
||||
r.msgs,
|
||||
}
|
||||
|
||||
if !n.sm.Step(m) {
|
||||
return false
|
||||
}
|
||||
if sr.containsUpdates(prev) {
|
||||
statec = n.statec
|
||||
} else {
|
||||
statec = nil
|
||||
}
|
||||
|
||||
for _, m := range n.sm.msgs[l:] {
|
||||
switch m.Type {
|
||||
case msgAppResp:
|
||||
// We just heard from the leader of the same term.
|
||||
n.elapsed = 0
|
||||
case msgVoteResp:
|
||||
// We just heard from the candidate the node voted for.
|
||||
if m.Index >= 0 {
|
||||
n.elapsed = 0
|
||||
}
|
||||
select {
|
||||
case p := <-propc:
|
||||
r.propose(p)
|
||||
case m := <-n.recvc:
|
||||
r.Step(m) // raft never returns an error
|
||||
case <-n.tickc:
|
||||
// r.tick()
|
||||
case statec <- sr:
|
||||
r.raftLog.resetNextEnts()
|
||||
r.raftLog.resetUnstable()
|
||||
r.msgs = nil
|
||||
case <-n.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Next returns all the appliable entries
|
||||
func (n *Node) Next() []Entry {
|
||||
ents := n.sm.nextEnts()
|
||||
for i := range ents {
|
||||
switch ents[i].Type {
|
||||
case Normal:
|
||||
case ClusterInit:
|
||||
cid, nr := binary.Varint(ents[i].Data)
|
||||
if nr <= 0 {
|
||||
panic("init cluster failed: cannot read clusterId")
|
||||
}
|
||||
if n.ClusterId() != -1 {
|
||||
panic("cannot init a started cluster")
|
||||
}
|
||||
n.sm.clusterId = cid
|
||||
case AddNode:
|
||||
c := new(Config)
|
||||
if err := json.Unmarshal(ents[i].Data, c); err != nil {
|
||||
log.Printf("raft: err=%q", err)
|
||||
continue
|
||||
}
|
||||
n.sm.addNode(c.NodeId)
|
||||
delete(n.rmNodes, c.NodeId)
|
||||
case RemoveNode:
|
||||
c := new(Config)
|
||||
if err := json.Unmarshal(ents[i].Data, c); err != nil {
|
||||
log.Printf("raft: err=%q", err)
|
||||
continue
|
||||
}
|
||||
n.sm.removeNode(c.NodeId)
|
||||
n.rmNodes[c.NodeId] = struct{}{}
|
||||
if c.NodeId == n.sm.id {
|
||||
n.removed = true
|
||||
}
|
||||
default:
|
||||
panic("unexpected entry type")
|
||||
}
|
||||
}
|
||||
return ents
|
||||
}
|
||||
|
||||
// Tick triggers the node to do a tick.
|
||||
// If the current elapsed is greater or equal than the timeout,
|
||||
// node will send corresponding message to the statemachine.
|
||||
func (n *Node) Tick() {
|
||||
if !n.sm.promotable {
|
||||
return
|
||||
}
|
||||
|
||||
timeout, msgType := n.electionRand, msgHup
|
||||
if n.sm.state == stateLeader {
|
||||
timeout, msgType = n.heartbeat, msgBeat
|
||||
}
|
||||
if n.elapsed >= timeout {
|
||||
n.Step(Message{From: n.sm.id, ClusterId: n.ClusterId(), Type: msgType})
|
||||
n.elapsed = 0
|
||||
if n.sm.state != stateLeader {
|
||||
n.electionRand = n.election + tick(rand.Int31())%n.election
|
||||
}
|
||||
} else {
|
||||
n.elapsed++
|
||||
func (n *Node) Tick() error {
|
||||
select {
|
||||
case n.tickc <- struct{}{}:
|
||||
return nil
|
||||
case <-n.ctx.Done():
|
||||
return n.ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// IsEmpty returns ture if the log of the node is empty.
|
||||
func (n *Node) IsEmpty() bool {
|
||||
return n.sm.raftLog.isEmpty()
|
||||
}
|
||||
|
||||
func (n *Node) UpdateConf(t int64, c *Config) {
|
||||
data, err := json.Marshal(c)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
// Propose proposes data be appended to the log.
|
||||
func (n *Node) Propose(ctx context.Context, data []byte) error {
|
||||
select {
|
||||
case n.propc <- data:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-n.ctx.Done():
|
||||
return n.ctx.Err()
|
||||
}
|
||||
n.propose(t, data)
|
||||
}
|
||||
|
||||
// UnstableEnts retuens all the entries that need to be persistent.
|
||||
// The first return value is offset, and the second one is unstable entries.
|
||||
func (n *Node) UnstableEnts() []Entry {
|
||||
return n.sm.raftLog.unstableEnts()
|
||||
}
|
||||
|
||||
func (n *Node) UnstableState() State {
|
||||
if n.sm.unstableState.IsEmpty() {
|
||||
return EmptyState
|
||||
// Step advances the state machine using m.
|
||||
func (n *Node) Step(m Message) error {
|
||||
select {
|
||||
case n.recvc <- m:
|
||||
return nil
|
||||
case <-n.ctx.Done():
|
||||
return n.ctx.Err()
|
||||
}
|
||||
s := n.sm.unstableState
|
||||
n.sm.clearState()
|
||||
return s
|
||||
}
|
||||
|
||||
func (n *Node) UnstableSnapshot() Snapshot {
|
||||
if n.sm.raftLog.unstableSnapshot.IsEmpty() {
|
||||
return emptySnapshot
|
||||
// ReadState returns the current point-in-time state.
|
||||
func (n *Node) ReadState(ctx context.Context) (st State, ents, cents []Entry, msgs []Message, err error) {
|
||||
select {
|
||||
case sr := <-n.statec:
|
||||
return sr.st, sr.ents, sr.cents, sr.msgs, nil
|
||||
case <-ctx.Done():
|
||||
return State{}, nil, nil, nil, ctx.Err()
|
||||
case <-n.ctx.Done():
|
||||
return State{}, nil, nil, nil, n.ctx.Err()
|
||||
}
|
||||
s := n.sm.raftLog.unstableSnapshot
|
||||
n.sm.raftLog.unstableSnapshot = emptySnapshot
|
||||
return s
|
||||
}
|
||||
|
||||
func (n *Node) GetSnap() Snapshot {
|
||||
return n.sm.raftLog.snapshot
|
||||
}
|
||||
|
||||
func (n *Node) Compact(d []byte) {
|
||||
n.sm.compact(d)
|
||||
}
|
||||
|
||||
func (n *Node) EntsLen() int {
|
||||
return len(n.sm.raftLog.ents)
|
||||
}
|
||||
|
||||
@@ -1,224 +1,5 @@
|
||||
package raft
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
import "testing"
|
||||
|
||||
const (
|
||||
defaultHeartbeat = 1
|
||||
defaultElection = 5
|
||||
)
|
||||
|
||||
func TestTickMsgHup(t *testing.T) {
|
||||
n := New(0, defaultHeartbeat, defaultElection)
|
||||
n.sm = newStateMachine(0, []int64{0, 1, 2})
|
||||
n.sm.promotable = true
|
||||
|
||||
for i := 0; i < defaultElection*2; i++ {
|
||||
n.Tick()
|
||||
}
|
||||
|
||||
called := false
|
||||
for _, m := range n.Msgs() {
|
||||
if m.Type == msgVote {
|
||||
called = true
|
||||
}
|
||||
}
|
||||
|
||||
if !called {
|
||||
t.Errorf("called = %v, want true", called)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTickMsgBeat(t *testing.T) {
|
||||
k := 3
|
||||
n := dictate(New(0, defaultHeartbeat, defaultElection))
|
||||
n.Next()
|
||||
for i := 1; i < k; i++ {
|
||||
n.Add(int64(i), "", nil)
|
||||
for _, m := range n.Msgs() {
|
||||
if m.Type == msgApp {
|
||||
n.Step(Message{From: m.To, ClusterId: m.ClusterId, Type: msgAppResp, Index: m.Index + int64(len(m.Entries))})
|
||||
}
|
||||
}
|
||||
// ignore commit index update messages
|
||||
n.Msgs()
|
||||
n.Next()
|
||||
}
|
||||
|
||||
for i := 0; i < defaultHeartbeat+1; i++ {
|
||||
n.Tick()
|
||||
}
|
||||
|
||||
called := 0
|
||||
for _, m := range n.Msgs() {
|
||||
if m.Type == msgApp && len(m.Entries) == 0 {
|
||||
called++
|
||||
}
|
||||
}
|
||||
|
||||
// msgBeat -> k-1 append
|
||||
w := k - 1
|
||||
if called != w {
|
||||
t.Errorf("called = %v, want %v", called, w)
|
||||
}
|
||||
}
|
||||
|
||||
func TestResetElapse(t *testing.T) {
|
||||
tests := []struct {
|
||||
msg Message
|
||||
welapsed tick
|
||||
}{
|
||||
{Message{From: 0, To: 1, Type: msgApp, Term: 2, Entries: []Entry{{Term: 1}}}, 0},
|
||||
{Message{From: 0, To: 1, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}}, 1},
|
||||
{Message{From: 0, To: 1, Type: msgVote, Term: 2, Index: 1, LogTerm: 1}, 0},
|
||||
{Message{From: 0, To: 1, Type: msgVote, Term: 1}, 1},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
n := New(0, defaultHeartbeat, defaultElection)
|
||||
n.sm = newStateMachine(0, []int64{0, 1, 2})
|
||||
n.sm.promotable = true
|
||||
n.sm.raftLog.append(0, Entry{Type: Normal, Term: 1})
|
||||
n.sm.term = 2
|
||||
n.sm.raftLog.committed = 1
|
||||
|
||||
n.Tick()
|
||||
if n.elapsed != 1 {
|
||||
t.Errorf("%d: elpased = %d, want %d", i, n.elapsed, 1)
|
||||
}
|
||||
|
||||
n.Step(tt.msg)
|
||||
if n.elapsed != tt.welapsed {
|
||||
t.Errorf("%d: elpased = %d, want %d", i, n.elapsed, tt.welapsed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartCluster(t *testing.T) {
|
||||
n := dictate(New(0, defaultHeartbeat, defaultElection))
|
||||
n.Next()
|
||||
|
||||
if len(n.sm.ins) != 1 {
|
||||
t.Errorf("k = %d, want 1", len(n.sm.ins))
|
||||
}
|
||||
if n.sm.id != 0 {
|
||||
t.Errorf("id = %d, want 0", n.sm.id)
|
||||
}
|
||||
if n.sm.state != stateLeader {
|
||||
t.Errorf("state = %s, want %s", n.sm.state, stateLeader)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdd(t *testing.T) {
|
||||
n := dictate(New(0, defaultHeartbeat, defaultElection))
|
||||
n.Next()
|
||||
n.Add(1, "", nil)
|
||||
n.Next()
|
||||
|
||||
if len(n.sm.ins) != 2 {
|
||||
t.Errorf("k = %d, want 2", len(n.sm.ins))
|
||||
}
|
||||
if n.sm.id != 0 {
|
||||
t.Errorf("id = %d, want 0", n.sm.id)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemove(t *testing.T) {
|
||||
n := dictate(New(0, defaultHeartbeat, defaultElection))
|
||||
n.Next()
|
||||
n.Add(1, "", nil)
|
||||
n.Next()
|
||||
n.Remove(0)
|
||||
n.Step(Message{Type: msgAppResp, From: 1, ClusterId: n.ClusterId(), Term: 1, Index: 5})
|
||||
n.Next()
|
||||
|
||||
if len(n.sm.ins) != 1 {
|
||||
t.Errorf("k = %d, want 1", len(n.sm.ins))
|
||||
}
|
||||
if n.sm.id != 0 {
|
||||
t.Errorf("id = %d, want 0", n.sm.id)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDenial(t *testing.T) {
|
||||
logents := []Entry{
|
||||
{Type: AddNode, Term: 1, Data: []byte(`{"NodeId":1}`)},
|
||||
{Type: AddNode, Term: 1, Data: []byte(`{"NodeId":2}`)},
|
||||
{Type: RemoveNode, Term: 1, Data: []byte(`{"NodeId":2}`)},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
ent Entry
|
||||
wdenied map[int64]bool
|
||||
}{
|
||||
{
|
||||
Entry{Type: AddNode, Term: 1, Data: []byte(`{"NodeId":2}`)},
|
||||
map[int64]bool{1: false, 2: false},
|
||||
},
|
||||
{
|
||||
Entry{Type: RemoveNode, Term: 1, Data: []byte(`{"NodeId":1}`)},
|
||||
map[int64]bool{1: true, 2: true},
|
||||
},
|
||||
{
|
||||
Entry{Type: RemoveNode, Term: 1, Data: []byte(`{"NodeId":0}`)},
|
||||
map[int64]bool{1: false, 2: true},
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
n := dictate(New(0, defaultHeartbeat, defaultElection))
|
||||
n.Next()
|
||||
n.Msgs()
|
||||
n.sm.raftLog.append(n.sm.raftLog.committed, append(logents, tt.ent)...)
|
||||
n.sm.raftLog.committed += int64(len(logents) + 1)
|
||||
n.Next()
|
||||
|
||||
for id, denied := range tt.wdenied {
|
||||
n.Step(Message{From: id, To: 0, ClusterId: n.ClusterId(), Type: msgApp, Term: 1})
|
||||
w := []Message{}
|
||||
if denied {
|
||||
w = []Message{{From: 0, To: id, ClusterId: n.ClusterId(), Term: 1, Type: msgDenied}}
|
||||
}
|
||||
if g := n.Msgs(); !reflect.DeepEqual(g, w) {
|
||||
t.Errorf("#%d: msgs for %d = %+v, want %+v", i, id, g, w)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRecover(t *testing.T) {
|
||||
ents := []Entry{{Term: 1}, {Term: 2}, {Term: 3}}
|
||||
state := State{Term: 500, Vote: 1, Commit: 3}
|
||||
|
||||
n := Recover(0, nil, ents, state, defaultHeartbeat, defaultElection)
|
||||
if g := n.Next(); !reflect.DeepEqual(g, ents) {
|
||||
t.Errorf("ents = %+v, want %+v", g, ents)
|
||||
}
|
||||
if g := n.sm.term; g.Get() != state.Term {
|
||||
t.Errorf("term = %d, want %d", g, state.Term)
|
||||
}
|
||||
if g := n.sm.vote; g != state.Vote {
|
||||
t.Errorf("vote = %d, want %d", g, state.Vote)
|
||||
}
|
||||
if g := n.sm.raftLog.committed; g != state.Commit {
|
||||
t.Errorf("committed = %d, want %d", g, state.Commit)
|
||||
}
|
||||
if g := n.UnstableEnts(); g != nil {
|
||||
t.Errorf("unstableEnts = %+v, want nil", g)
|
||||
}
|
||||
if g := n.UnstableState(); !reflect.DeepEqual(g, state) {
|
||||
t.Errorf("unstableState = %+v, want %+v", g, state)
|
||||
}
|
||||
if g := n.Msgs(); len(g) != 0 {
|
||||
t.Errorf("#%d: len(msgs) = %d, want 0", len(g))
|
||||
}
|
||||
}
|
||||
|
||||
func dictate(n *Node) *Node {
|
||||
n.Step(Message{From: n.Id(), Type: msgHup})
|
||||
n.InitCluster(0xBEEF)
|
||||
n.Add(n.Id(), "", nil)
|
||||
return n
|
||||
}
|
||||
func TestNode(t *testing.T) {}
|
||||
|
||||
127
raft2/node.go
127
raft2/node.go
@@ -1,127 +0,0 @@
|
||||
// Package raft implements raft.
|
||||
package raft
|
||||
|
||||
import "code.google.com/p/go.net/context"
|
||||
|
||||
type stateResp struct {
|
||||
st State
|
||||
ents, cents []Entry
|
||||
msgs []Message
|
||||
}
|
||||
|
||||
func (a State) Equal(b State) bool {
|
||||
return a.Term == b.Term && a.Vote == b.Vote && a.LastIndex == b.LastIndex
|
||||
}
|
||||
|
||||
func (sr stateResp) containsUpdates(prev stateResp) bool {
|
||||
return !prev.st.Equal(sr.st) || len(sr.ents) > 0 || len(sr.cents) > 0 || len(sr.msgs) > 0
|
||||
}
|
||||
|
||||
type Node struct {
|
||||
ctx context.Context
|
||||
propc chan []byte
|
||||
recvc chan Message
|
||||
statec chan stateResp
|
||||
tickc chan struct{}
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, id int64, peers []int64) *Node {
|
||||
n := &Node{
|
||||
ctx: ctx,
|
||||
propc: make(chan []byte),
|
||||
recvc: make(chan Message),
|
||||
statec: make(chan stateResp),
|
||||
tickc: make(chan struct{}),
|
||||
}
|
||||
r := newRaft(id, peers)
|
||||
go n.run(r)
|
||||
return n
|
||||
}
|
||||
|
||||
func (n *Node) run(r *raft) {
|
||||
propc := n.propc
|
||||
statec := n.statec
|
||||
|
||||
var prev stateResp
|
||||
for {
|
||||
if r.hasLeader() {
|
||||
propc = n.propc
|
||||
} else {
|
||||
// We cannot accept proposals because we don't know who
|
||||
// to send them to, so we'll apply back-pressure and
|
||||
// block senders.
|
||||
propc = nil
|
||||
}
|
||||
|
||||
sr := stateResp{
|
||||
r.State,
|
||||
r.raftLog.unstableEnts(),
|
||||
r.raftLog.nextEnts(),
|
||||
r.msgs,
|
||||
}
|
||||
|
||||
if sr.containsUpdates(prev) {
|
||||
statec = n.statec
|
||||
} else {
|
||||
statec = nil
|
||||
}
|
||||
|
||||
select {
|
||||
case p := <-propc:
|
||||
r.propose(p)
|
||||
case m := <-n.recvc:
|
||||
r.Step(m) // raft never returns an error
|
||||
case <-n.tickc:
|
||||
// r.tick()
|
||||
case statec <- sr:
|
||||
r.raftLog.resetNextEnts()
|
||||
r.raftLog.resetUnstable()
|
||||
r.msgs = nil
|
||||
case <-n.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Node) Tick() error {
|
||||
select {
|
||||
case n.tickc <- struct{}{}:
|
||||
return nil
|
||||
case <-n.ctx.Done():
|
||||
return n.ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Propose proposes data be appended to the log.
|
||||
func (n *Node) Propose(ctx context.Context, data []byte) error {
|
||||
select {
|
||||
case n.propc <- data:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-n.ctx.Done():
|
||||
return n.ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Step advances the state machine using m.
|
||||
func (n *Node) Step(m Message) error {
|
||||
select {
|
||||
case n.recvc <- m:
|
||||
return nil
|
||||
case <-n.ctx.Done():
|
||||
return n.ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// ReadState returns the current point-in-time state.
|
||||
func (n *Node) ReadState(ctx context.Context) (st State, ents, cents []Entry, msgs []Message, err error) {
|
||||
select {
|
||||
case sr := <-n.statec:
|
||||
return sr.st, sr.ents, sr.cents, sr.msgs, nil
|
||||
case <-ctx.Done():
|
||||
return State{}, nil, nil, nil, ctx.Err()
|
||||
case <-n.ctx.Done():
|
||||
return State{}, nil, nil, nil, n.ctx.Err()
|
||||
}
|
||||
}
|
||||
@@ -1,5 +0,0 @@
|
||||
package raft
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestNode(t *testing.T) {}
|
||||
Reference in New Issue
Block a user