raft: change index and term to int64

This commit is contained in:
Xiang Li 2014-07-10 22:51:37 -07:00 committed by Yicheng Qin
parent e11c7f35b4
commit 30f4d9faea
10 changed files with 123 additions and 121 deletions

View File

@ -184,7 +184,7 @@ func (s *Server) run() {
}
func (s *Server) apply(ents []raft.Entry) {
offset := s.node.Applied() - len(ents) + 1
offset := s.node.Applied() - int64(len(ents)) + 1
for i, ent := range ents {
switch ent.Type {
// expose raft entry type
@ -192,7 +192,7 @@ func (s *Server) apply(ents []raft.Entry) {
if len(ent.Data) == 0 {
continue
}
s.v2apply(offset+i, ent)
s.v2apply(offset+int64(i), ent)
case raft.AddNode:
cfg := new(raft.Config)
if err := json.Unmarshal(ent.Data, cfg); err != nil {

View File

@ -9,7 +9,7 @@ import (
"github.com/coreos/etcd/store"
)
func (s *Server) v2apply(index int, ent raft.Entry) {
func (s *Server) v2apply(index int64, ent raft.Entry) {
var ret interface{}
var e *store.Event
var err error

View File

@ -14,14 +14,14 @@ type v2Proposal struct {
}
type wait struct {
index int
term int
index int64
term int64
}
type v2Raft struct {
*raft.Node
result map[wait]chan interface{}
term int
term int64
}
func (r *v2Raft) Propose(p v2Proposal) error {

View File

@ -3,7 +3,7 @@ package raft
import "fmt"
const (
Normal int = iota
Normal int64 = iota
AddNode
RemoveNode
@ -14,8 +14,8 @@ const (
)
type Entry struct {
Type int
Term int
Type int64
Term int64
Data []byte
}
@ -25,13 +25,13 @@ func (e *Entry) isConfig() bool {
type log struct {
ents []Entry
committed int
applied int
offset int
committed int64
applied int64
offset int64
// want a compact after the number of entries exceeds the threshold
// TODO(xiangli) size might be a better criteria
compactThreshold int
compactThreshold int64
}
func newLog() *log {
@ -43,7 +43,7 @@ func newLog() *log {
}
}
func (l *log) maybeAppend(index, logTerm, committed int, ents ...Entry) bool {
func (l *log) maybeAppend(index, logTerm, committed int64, ents ...Entry) bool {
if l.matchTerm(index, logTerm) {
l.append(index, ents...)
l.committed = committed
@ -52,23 +52,23 @@ func (l *log) maybeAppend(index, logTerm, committed int, ents ...Entry) bool {
return false
}
func (l *log) append(after int, ents ...Entry) int {
func (l *log) append(after int64, ents ...Entry) int64 {
l.ents = append(l.slice(l.offset, after+1), ents...)
return l.lastIndex()
}
func (l *log) lastIndex() int {
return len(l.ents) - 1 + l.offset
func (l *log) lastIndex() int64 {
return int64(len(l.ents)) - 1 + l.offset
}
func (l *log) term(i int) int {
func (l *log) term(i int64) int64 {
if e := l.at(i); e != nil {
return e.Term
}
return -1
}
func (l *log) entries(i int) []Entry {
func (l *log) entries(i int64) []Entry {
// never send out the first entry
// first entry is only used for matching
// prevLogTerm
@ -78,19 +78,19 @@ func (l *log) entries(i int) []Entry {
return l.slice(i, l.lastIndex()+1)
}
func (l *log) isUpToDate(i, term int) bool {
func (l *log) isUpToDate(i, term int64) bool {
e := l.at(l.lastIndex())
return term > e.Term || (term == e.Term && i >= l.lastIndex())
}
func (l *log) matchTerm(i, term int) bool {
func (l *log) matchTerm(i, term int64) bool {
if e := l.at(i); e != nil {
return e.Term == term
}
return false
}
func (l *log) maybeCommit(maxIndex, term int) bool {
func (l *log) maybeCommit(maxIndex, term int64) bool {
if maxIndex > l.committed && l.term(maxIndex) == term {
l.committed = maxIndex
return true
@ -112,27 +112,27 @@ func (l *log) nextEnts() (ents []Entry) {
// i must be not smaller than the index of the first entry
// and not greater than the index of the last entry.
// the number of entries after compaction will be returned.
func (l *log) compact(i int) int {
func (l *log) compact(i int64) int64 {
if l.isOutOfBounds(i) {
panic(fmt.Sprintf("compact %d out of bounds [%d:%d]", i, l.offset, l.lastIndex()))
}
l.ents = l.slice(i, l.lastIndex()+1)
l.offset = i
return len(l.ents)
return int64(len(l.ents))
}
func (l *log) shouldCompact() bool {
return (l.applied - l.offset) > l.compactThreshold
}
func (l *log) restore(index, term int) {
func (l *log) restore(index, term int64) {
l.ents = []Entry{{Term: term}}
l.committed = index
l.applied = index
l.offset = index
}
func (l *log) at(i int) *Entry {
func (l *log) at(i int64) *Entry {
if l.isOutOfBounds(i) {
return nil
}
@ -140,7 +140,7 @@ func (l *log) at(i int) *Entry {
}
// slice get a slice of log entries from lo through hi-1, inclusive.
func (l *log) slice(lo int, hi int) []Entry {
func (l *log) slice(lo int64, hi int64) []Entry {
if lo >= hi {
return nil
}
@ -150,7 +150,7 @@ func (l *log) slice(lo int, hi int) []Entry {
return l.ents[lo-l.offset : hi-l.offset]
}
func (l *log) isOutOfBounds(i int) bool {
func (l *log) isOutOfBounds(i int64) bool {
if i < l.offset || i > l.lastIndex() {
return true
}

View File

@ -8,11 +8,12 @@ import (
// TestCompactionSideEffects ensures that all the log related funcationality works correctly after
// a compaction.
func TestCompactionSideEffects(t *testing.T) {
lastIndex := 1000
var i int64
lastIndex := int64(1000)
log := newLog()
for i := 0; i < lastIndex; i++ {
log.append(i, Entry{Term: i + 1})
for i = 0; i < lastIndex; i++ {
log.append(int64(i), Entry{Term: int64(i + 1)})
}
log.compact(500)
@ -49,15 +50,15 @@ func TestCompactionSideEffects(t *testing.T) {
func TestCompaction(t *testing.T) {
tests := []struct {
app int
compact []int
compact []int64
wleft []int
wallow bool
}{
// out of upper bound
{1000, []int{1001}, []int{-1}, false},
{1000, []int{300, 500, 800, 900}, []int{701, 501, 201, 101}, true},
{1000, []int64{1001}, []int{-1}, false},
{1000, []int64{300, 500, 800, 900}, []int{701, 501, 201, 101}, true},
// out of lower bound
{1000, []int{300, 299}, []int{701, -1}, false},
{1000, []int64{300, 299}, []int{701, -1}, false},
}
for i, tt := range tests {
@ -72,7 +73,7 @@ func TestCompaction(t *testing.T) {
log := newLog()
for i := 0; i < tt.app; i++ {
log.append(i, Entry{})
log.append(int64(i), Entry{})
}
for j := 0; j < len(tt.compact); j++ {
@ -86,13 +87,14 @@ func TestCompaction(t *testing.T) {
}
func TestLogRestore(t *testing.T) {
var i int64
log := newLog()
for i := 0; i < 100; i++ {
for i = 0; i < 100; i++ {
log.append(i, Entry{Term: i + 1})
}
index := 1000
term := 1000
index := int64(1000)
term := int64(1000)
log.restore(index, term)
// only has the guard entry
@ -114,12 +116,12 @@ func TestLogRestore(t *testing.T) {
}
func TestIsOutOfBounds(t *testing.T) {
offset := 100
num := 100
offset := int64(100)
num := int64(100)
l := &log{offset: offset, ents: make([]Entry, num)}
tests := []struct {
index int
index int64
w bool
}{
{offset - 1, true},
@ -138,16 +140,17 @@ func TestIsOutOfBounds(t *testing.T) {
}
func TestAt(t *testing.T) {
offset := 100
num := 100
var i int64
offset := int64(100)
num := int64(100)
l := &log{offset: offset}
for i := 0; i < num; i++ {
for i = 0; i < num; i++ {
l.ents = append(l.ents, Entry{Term: i})
}
tests := []struct {
index int
index int64
w *Entry
}{
{offset - 1, nil},
@ -166,17 +169,18 @@ func TestAt(t *testing.T) {
}
func TestSlice(t *testing.T) {
offset := 100
num := 100
var i int64
offset := int64(100)
num := int64(100)
l := &log{offset: offset}
for i := 0; i < num; i++ {
for i = 0; i < num; i++ {
l.ents = append(l.ents, Entry{Term: i})
}
tests := []struct {
from int
to int
from int64
to int64
w []Entry
}{
{offset - 1, offset + 1, nil},

View File

@ -11,7 +11,7 @@ type Interface interface {
Msgs() []Message
}
type tick int
type tick int64
type Config struct {
NodeId int64
@ -45,11 +45,11 @@ func (n *Node) Id() int64 {
return atomic.LoadInt64(&n.sm.id)
}
func (n *Node) Index() int { return n.sm.log.lastIndex() }
func (n *Node) Index() int64 { return n.sm.log.lastIndex() }
func (n *Node) Term() int { return n.sm.term }
func (n *Node) Term() int64 { return n.sm.term }
func (n *Node) Applied() int { return n.sm.log.applied }
func (n *Node) Applied() int64 { return n.sm.log.applied }
func (n *Node) HasLeader() bool { return n.Leader() != none }
@ -60,7 +60,7 @@ func (n *Node) Leader() int64 { return n.sm.lead.Get() }
// Propose asynchronously proposes data be applied to the underlying state machine.
func (n *Node) Propose(data []byte) { n.propose(Normal, data) }
func (n *Node) propose(t int, data []byte) {
func (n *Node) propose(t int64, data []byte) {
n.Step(Message{Type: msgProp, Entries: []Entry{{Type: t, Data: data}}})
}
@ -141,7 +141,7 @@ func (n *Node) Tick() {
}
}
func (n *Node) updateConf(t int, c *Config) {
func (n *Node) updateConf(t int64, c *Config) {
data, err := json.Marshal(c)
if err != nil {
panic(err)

View File

@ -39,7 +39,7 @@ func TestTickMsgBeat(t *testing.T) {
n.Add(int64(i), "", nil)
for _, m := range n.Msgs() {
if m.Type == msgApp {
n.Step(Message{From: m.To, Type: msgAppResp, Index: m.Index + len(m.Entries)})
n.Step(Message{From: m.To, Type: msgAppResp, Index: m.Index + int64(len(m.Entries))})
}
}
// ignore commit index update messages

View File

@ -8,7 +8,7 @@ import (
const none = -1
type messageType int
type messageType int64
const (
msgHup messageType = iota
@ -33,7 +33,7 @@ var mtmap = [...]string{
}
func (mt messageType) String() string {
return mtmap[int(mt)]
return mtmap[int64(mt)]
}
var errNoLeader = errors.New("no leader")
@ -44,7 +44,7 @@ const (
stateLeader
)
type stateType int
type stateType int64
var stmap = [...]string{
stateFollower: "stateFollower",
@ -59,27 +59,27 @@ var stepmap = [...]stepFunc{
}
func (st stateType) String() string {
return stmap[int(st)]
return stmap[int64(st)]
}
type Message struct {
Type messageType
To int64
From int64
Term int
LogTerm int
Index int
PrevTerm int
Term int64
LogTerm int64
Index int64
PrevTerm int64
Entries []Entry
Commit int
Commit int64
Snapshot Snapshot
}
type index struct {
match, next int
match, next int64
}
func (in *index) update(n int) {
func (in *index) update(n int64) {
in.match = n
in.next = n + 1
}
@ -93,21 +93,26 @@ func (in *index) decr() {
// An AtomicInt is an int64 to be accessed atomically.
type atomicInt int64
// Add atomically adds n to i.
func (i *atomicInt) Set(n int64) {
atomic.StoreInt64((*int64)(i), n)
}
// Get atomically gets the value of i.
func (i *atomicInt) Get() int64 {
return atomic.LoadInt64((*int64)(i))
}
// int64Slice implements sort interface
type int64Slice []int64
func (p int64Slice) Len() int { return len(p) }
func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
type stateMachine struct {
id int64
// the term we are participating in at any time
term int
term int64
// who we voted for in term
vote int64
@ -194,11 +199,11 @@ func (sm *stateMachine) bcastAppend() {
func (sm *stateMachine) maybeCommit() bool {
// TODO(bmizerany): optimize.. Currently naive
mis := make([]int, 0, len(sm.ins))
mis := make(int64Slice, 0, len(sm.ins))
for i := range sm.ins {
mis = append(mis, sm.ins[i].match)
}
sort.Sort(sort.Reverse(sort.IntSlice(mis)))
sort.Sort(sort.Reverse(mis))
mci := mis[sm.q()-1]
return sm.log.maybeCommit(mci, sm.term)
@ -209,7 +214,7 @@ func (sm *stateMachine) nextEnts() (ents []Entry) {
return sm.log.nextEnts()
}
func (sm *stateMachine) reset(term int) {
func (sm *stateMachine) reset(term int64) {
sm.term = term
sm.lead.Set(none)
sm.vote = none
@ -240,7 +245,7 @@ func (sm *stateMachine) promotable() bool {
return sm.log.committed != 0
}
func (sm *stateMachine) becomeFollower(term int, lead int64) {
func (sm *stateMachine) becomeFollower(term int64, lead int64) {
sm.reset(term)
sm.lead.Set(lead)
sm.state = stateFollower
@ -449,7 +454,7 @@ func (sm *stateMachine) restore(s Snapshot) {
sm.snapshoter.Restore(s)
}
func (sm *stateMachine) needSnapshot(i int) bool {
func (sm *stateMachine) needSnapshot(i int64) bool {
if i < sm.log.offset {
if sm.snapshoter == nil {
panic("need snapshot but snapshoter is nil")

View File

@ -43,7 +43,7 @@ func TestLogReplication(t *testing.T) {
tests := []struct {
*network
msgs []Message
wcommitted int
wcommitted int64
}{
{
newNetwork(nil, nil, nil),
@ -214,7 +214,7 @@ func TestDuelingCandidates(t *testing.T) {
tests := []struct {
sm *stateMachine
state stateType
term int
term int64
log *log
}{
{a, stateFollower, 2, wlog},
@ -406,30 +406,30 @@ func TestProposalByProxy(t *testing.T) {
func TestCommit(t *testing.T) {
tests := []struct {
matches []int
matches []int64
logs []Entry
smTerm int
w int
smTerm int64
w int64
}{
// single
{[]int{1}, []Entry{{}, {Term: 1}}, 1, 1},
{[]int{1}, []Entry{{}, {Term: 1}}, 2, 0},
{[]int{2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
{[]int{1}, []Entry{{}, {Term: 2}}, 2, 1},
{[]int64{1}, []Entry{{}, {Term: 1}}, 1, 1},
{[]int64{1}, []Entry{{}, {Term: 1}}, 2, 0},
{[]int64{2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
{[]int64{1}, []Entry{{}, {Term: 2}}, 2, 1},
// odd
{[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
{[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
{[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]int64{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
{[]int64{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]int64{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
{[]int64{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
// even
{[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
{[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
{[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
{[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]int64{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
{[]int64{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]int64{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
{[]int64{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]int64{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
{[]int64{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
}
for i, tt := range tests {
@ -448,9 +448,9 @@ func TestCommit(t *testing.T) {
func TestRecvMsgVote(t *testing.T) {
tests := []struct {
state stateType
i, term int
i, term int64
voteFor int64
w int
w int64
}{
{stateFollower, 0, 0, none, -1},
{stateFollower, 0, 1, none, -1},
@ -504,7 +504,7 @@ func TestStateTransition(t *testing.T) {
from stateType
to stateType
wallow bool
wterm int
wterm int64
wlead int64
}{
{stateFollower, stateFollower, true, 1, none},
@ -579,7 +579,7 @@ func TestConf(t *testing.T) {
// the uncommitted log entries
func TestConfChangeLeader(t *testing.T) {
tests := []struct {
et int
et int64
wPending bool
}{
{Normal, false},
@ -605,8 +605,8 @@ func TestAllServerStepdown(t *testing.T) {
state stateType
wstate stateType
wterm int
windex int
wterm int64
windex int64
}{
{stateFollower, stateFollower, 3, 1},
{stateCandidate, stateFollower, 3, 1},
@ -614,7 +614,7 @@ func TestAllServerStepdown(t *testing.T) {
}
tmsgTypes := [...]messageType{msgVote, msgApp}
tterm := 3
tterm := int64(3)
for i, tt := range tests {
sm := newStateMachine(0, []int64{0, 1, 2})
@ -637,7 +637,7 @@ func TestAllServerStepdown(t *testing.T) {
if sm.term != tt.wterm {
t.Errorf("#%d.%d term = %v , want %v", i, j, sm.term, tt.wterm)
}
if len(sm.log.ents) != tt.windex {
if int64(len(sm.log.ents)) != tt.windex {
t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.log.ents), tt.windex)
}
}
@ -646,10 +646,10 @@ func TestAllServerStepdown(t *testing.T) {
func TestLeaderAppResp(t *testing.T) {
tests := []struct {
index int
index int64
wmsgNum int
windex int
wcommitted int
windex int64
wcommitted int64
}{
{-1, 1, 1, 0}, // bad resp; leader does not commit; reply with log entries
{2, 2, 2, 2}, // good resp; leader commits; broadcast with commit index
@ -714,7 +714,7 @@ func TestRecvMsgBeat(t *testing.T) {
func TestMaybeCompact(t *testing.T) {
tests := []struct {
snapshoter Snapshoter
applied int
applied int64
wCompact bool
}{
{nil, defaultCompactThreshold + 1, false},
@ -726,7 +726,7 @@ func TestMaybeCompact(t *testing.T) {
sm := newStateMachine(0, []int64{0, 1, 2})
sm.setSnapshoter(tt.snapshoter)
for i := 0; i < defaultCompactThreshold*2; i++ {
sm.log.append(i, Entry{Term: i + 1})
sm.log.append(int64(i), Entry{Term: int64(i + 1)})
}
sm.log.applied = tt.applied
sm.log.committed = tt.applied
@ -891,7 +891,7 @@ func TestSlowNodeRestore(t *testing.T) {
}
}
func ents(terms ...int) *stateMachine {
func ents(terms ...int64) *stateMachine {
ents := []Entry{{}}
for _, term := range terms {
ents = append(ents, Entry{Term: term})
@ -1022,7 +1022,7 @@ type logSnapshoter struct {
snapshot Snapshot
}
func (s *logSnapshoter) Snap(index, term int, nodes []int64) {
func (s *logSnapshoter) Snap(index, term int64, nodes []int64) {
s.snapshot = Snapshot{
Index: index,
Term: term,
@ -1036,10 +1036,3 @@ func (s *logSnapshoter) Restore(ss Snapshot) {
func (s *logSnapshoter) GetSnap() Snapshot {
return s.snapshot
}
// int64Slice implements sort interface
type int64Slice []int64
func (p int64Slice) Len() int { return len(p) }
func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

View File

@ -6,15 +6,15 @@ type Snapshot struct {
// the configuration
Nodes []int64
// the index at which the snapshot was taken.
Index int
Index int64
// the log term of the index
Term int
Term int64
}
// A snapshoter can make a snapshot of its current state atomically.
// It can restore from a snapshot and get the latest snapshot it took.
type Snapshoter interface {
Snap(index, term int, nodes []int64)
Snap(index, term int64, nodes []int64)
Restore(snap Snapshot)
GetSnap() Snapshot
}