raft: move unstable related function to log_unstable.go

This commit is contained in:
Xiang Li 2014-11-26 14:40:58 -08:00
parent 7358ef21a2
commit 8a626257c7
3 changed files with 168 additions and 58 deletions

View File

@ -41,18 +41,6 @@ type raftLog struct {
applied uint64
}
// unstable.entris[i] has raft log position i+unstable.offset.
// Note that unstable.offset may be less than the highest log
// position in storage; this means that the next write to storage
// might need to truncate the log before persisting unstable.entries.
type unstable struct {
// the incoming unstable snapshot, if any.
snapshot *pb.Snapshot
// all entries that have not yet been written to storage.
entries []pb.Entry
offset uint64
}
// newLog returns log using the given storage. It recovers the log to the state
// that it just commits and applies the lastest snapshot.
func newLog(storage Storage) *raftLog {
@ -106,15 +94,7 @@ func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 {
if after < l.committed {
log.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
}
if after < l.unstable.offset {
// The log is being truncated to before our current unstable
// portion, so discard it and reset unstable.
l.unstable.entries = nil
l.unstable.offset = after + 1
}
// Truncate any unstable entries that are being replaced, then
// append the new ones.
l.unstable.entries = append(l.unstable.entries[:after+1-l.unstable.offset], ents...)
l.unstable.truncateAndAppend(after, ents)
return l.lastIndex()
}
@ -166,8 +146,8 @@ func (l *raftLog) snapshot() (pb.Snapshot, error) {
}
func (l *raftLog) firstIndex() uint64 {
if l.unstable.snapshot != nil {
return l.unstable.snapshot.Metadata.Index + 1
if i, ok := l.unstable.maybeFirstIndex(); ok {
return i
}
index, err := l.storage.FirstIndex()
if err != nil {
@ -177,7 +157,14 @@ func (l *raftLog) firstIndex() uint64 {
}
func (l *raftLog) lastIndex() uint64 {
return l.unstable.offset + uint64(len(l.unstable.entries)) - 1
if i, ok := l.unstable.maybeLastIndex(); ok {
return i
}
i, err := l.storage.LastIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
return i
}
func (l *raftLog) commitTo(tocommit uint64) {
@ -201,12 +188,11 @@ func (l *raftLog) appliedTo(i uint64) {
}
func (l *raftLog) stableTo(i uint64) {
if i < l.unstable.offset || i+1-l.unstable.offset > uint64(len(l.unstable.entries)) {
log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]",
i, l.unstable.offset, len(l.unstable.entries))
}
l.unstable.entries = l.unstable.entries[i+1-l.unstable.offset:]
l.unstable.offset = i + 1
l.unstable.stableTo(i)
}
func (l *raftLog) stableSnapTo(i uint64) {
l.unstable.stableSnapTo(i)
}
func (l *raftLog) lastTerm() uint64 {
@ -214,28 +200,22 @@ func (l *raftLog) lastTerm() uint64 {
}
func (l *raftLog) term(i uint64) uint64 {
switch {
case i > l.lastIndex():
if i > l.lastIndex() {
return 0
case i < l.unstable.offset:
if snap := l.unstable.snapshot; snap != nil {
if i == snap.Metadata.Index {
return snap.Metadata.Term
}
return 0
}
t, err := l.storage.Term(i)
switch err {
case nil:
return t
case ErrCompacted:
return 0
default:
panic(err) // TODO(bdarnell)
}
default:
return l.unstable.entries[i-l.unstable.offset].Term
}
if t, ok := l.unstable.maybeTerm(i); ok {
return t
}
t, err := l.storage.Term(i)
if err == nil {
return t
}
if err == ErrCompacted {
return 0
}
panic(err) // TODO(bdarnell)
}
func (l *raftLog) entries(i uint64) []pb.Entry {
@ -271,9 +251,7 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
func (l *raftLog) restore(s pb.Snapshot) {
l.committed = s.Metadata.Index
l.unstable.offset = l.committed + 1
l.unstable.entries = nil
l.unstable.snapshot = &s
l.unstable.restore(s)
}
// slice returns a slice of log entries from lo through hi-1, inclusive.
@ -297,8 +275,8 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry {
ents = append(ents, storedEnts...)
}
if hi > l.unstable.offset {
firstUnstable := max(lo, l.unstable.offset)
ents = append(ents, l.unstable.entries[firstUnstable-l.unstable.offset:hi-l.unstable.offset]...)
unstable := l.unstable.slice(max(lo, l.unstable.offset), hi)
ents = append(ents, unstable...)
}
return ents
}

134
raft/log_unstable.go Normal file
View File

@ -0,0 +1,134 @@
/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package raft
import (
"log"
pb "github.com/coreos/etcd/raft/raftpb"
)
// unstable.entris[i] has raft log position i+unstable.offset.
// Note that unstable.offset may be less than the highest log
// position in storage; this means that the next write to storage
// might need to truncate the log before persisting unstable.entries.
type unstable struct {
// the incoming unstable snapshot, if any.
snapshot *pb.Snapshot
// all entries that have not yet been written to storage.
entries []pb.Entry
offset uint64
}
// maybeFirstIndex returns the first index if it has a snapshot.
func (u *unstable) maybeFirstIndex() (uint64, bool) {
if u.snapshot != nil {
return u.snapshot.Metadata.Index, true
}
return 0, false
}
// maybeLastIndex returns the last index if it has at least one
// unstable entry or snapshot.
func (u *unstable) maybeLastIndex() (uint64, bool) {
if l := len(u.entries); l != 0 {
return u.offset + uint64(l) - 1, true
}
if u.snapshot != nil {
return u.snapshot.Metadata.Index, true
}
return 0, false
}
// myabeTerm returns the term of the entry at index i, if there
// is any.
func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
if i < u.offset {
if u.snapshot == nil {
return 0, false
}
if u.snapshot.Metadata.Index == i {
return u.snapshot.Metadata.Term, true
}
return 0, false
}
last, ok := u.maybeLastIndex()
if !ok {
return 0, false
}
if i > last {
return 0, false
}
return u.entries[i-u.offset].Term, true
}
func (u *unstable) stableTo(i uint64) {
if i < u.offset || i+1-u.offset > uint64(len(u.entries)) {
log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]",
i, u.offset, len(u.entries))
}
u.entries = u.entries[i+1-u.offset:]
u.offset = i + 1
}
func (u *unstable) stableSnapTo(i uint64) {
if u.snapshot != nil && u.snapshot.Metadata.Index == i {
u.snapshot = nil
}
}
func (u *unstable) restore(s pb.Snapshot) {
u.offset = s.Metadata.Index + 1
u.entries = nil
u.snapshot = &s
}
func (u *unstable) resetEntries(offset uint64) {
u.entries = nil
u.offset = offset
}
func (u *unstable) truncateAndAppend(after uint64, ents []pb.Entry) {
if after < u.offset {
// The log is being truncated to before our current unstable
// portion, so discard it and reset unstable.
u.resetEntries(after + 1)
}
u.entries = append(u.slice(u.offset, after+1), ents...)
}
func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
if lo >= hi {
return nil
}
if u.isOutOfBounds(lo) || u.isOutOfBounds(hi-1) {
return nil
}
return u.entries[lo-u.offset : hi-u.offset]
}
func (u *unstable) isOutOfBounds(i uint64) bool {
if len(u.entries) == 0 {
return true
}
last := u.offset + uint64(len(u.entries)) - 1
if i < u.offset || i > last {
return true
}
return false
}

View File

@ -306,9 +306,7 @@ func (n *node) run(r *raft) {
r.raftLog.stableTo(prevLastUnstablei)
havePrevLastUnstablei = false
}
if r.raftLog.unstable.snapshot != nil && r.raftLog.unstable.snapshot.Metadata.Index == prevSnapi {
r.raftLog.unstable.snapshot = nil
}
r.raftLog.stableSnapTo(prevSnapi)
advancec = nil
case <-n.stop:
close(n.done)