diff --git a/raft/log.go b/raft/log.go index a6c851d6e..9954c92ee 100644 --- a/raft/log.go +++ b/raft/log.go @@ -41,18 +41,6 @@ type raftLog struct { applied uint64 } -// unstable.entris[i] has raft log position i+unstable.offset. -// Note that unstable.offset may be less than the highest log -// position in storage; this means that the next write to storage -// might need to truncate the log before persisting unstable.entries. -type unstable struct { - // the incoming unstable snapshot, if any. - snapshot *pb.Snapshot - // all entries that have not yet been written to storage. - entries []pb.Entry - offset uint64 -} - // newLog returns log using the given storage. It recovers the log to the state // that it just commits and applies the lastest snapshot. func newLog(storage Storage) *raftLog { @@ -106,15 +94,7 @@ func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 { if after < l.committed { log.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) } - if after < l.unstable.offset { - // The log is being truncated to before our current unstable - // portion, so discard it and reset unstable. - l.unstable.entries = nil - l.unstable.offset = after + 1 - } - // Truncate any unstable entries that are being replaced, then - // append the new ones. - l.unstable.entries = append(l.unstable.entries[:after+1-l.unstable.offset], ents...) + l.unstable.truncateAndAppend(after, ents) return l.lastIndex() } @@ -166,8 +146,8 @@ func (l *raftLog) snapshot() (pb.Snapshot, error) { } func (l *raftLog) firstIndex() uint64 { - if l.unstable.snapshot != nil { - return l.unstable.snapshot.Metadata.Index + 1 + if i, ok := l.unstable.maybeFirstIndex(); ok { + return i } index, err := l.storage.FirstIndex() if err != nil { @@ -177,7 +157,14 @@ func (l *raftLog) firstIndex() uint64 { } func (l *raftLog) lastIndex() uint64 { - return l.unstable.offset + uint64(len(l.unstable.entries)) - 1 + if i, ok := l.unstable.maybeLastIndex(); ok { + return i + } + i, err := l.storage.LastIndex() + if err != nil { + panic(err) // TODO(bdarnell) + } + return i } func (l *raftLog) commitTo(tocommit uint64) { @@ -201,12 +188,11 @@ func (l *raftLog) appliedTo(i uint64) { } func (l *raftLog) stableTo(i uint64) { - if i < l.unstable.offset || i+1-l.unstable.offset > uint64(len(l.unstable.entries)) { - log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]", - i, l.unstable.offset, len(l.unstable.entries)) - } - l.unstable.entries = l.unstable.entries[i+1-l.unstable.offset:] - l.unstable.offset = i + 1 + l.unstable.stableTo(i) +} + +func (l *raftLog) stableSnapTo(i uint64) { + l.unstable.stableSnapTo(i) } func (l *raftLog) lastTerm() uint64 { @@ -214,28 +200,22 @@ func (l *raftLog) lastTerm() uint64 { } func (l *raftLog) term(i uint64) uint64 { - switch { - case i > l.lastIndex(): + if i > l.lastIndex() { return 0 - case i < l.unstable.offset: - if snap := l.unstable.snapshot; snap != nil { - if i == snap.Metadata.Index { - return snap.Metadata.Term - } - return 0 - } - t, err := l.storage.Term(i) - switch err { - case nil: - return t - case ErrCompacted: - return 0 - default: - panic(err) // TODO(bdarnell) - } - default: - return l.unstable.entries[i-l.unstable.offset].Term } + + if t, ok := l.unstable.maybeTerm(i); ok { + return t + } + + t, err := l.storage.Term(i) + if err == nil { + return t + } + if err == ErrCompacted { + return 0 + } + panic(err) // TODO(bdarnell) } func (l *raftLog) entries(i uint64) []pb.Entry { @@ -271,9 +251,7 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { func (l *raftLog) restore(s pb.Snapshot) { l.committed = s.Metadata.Index - l.unstable.offset = l.committed + 1 - l.unstable.entries = nil - l.unstable.snapshot = &s + l.unstable.restore(s) } // slice returns a slice of log entries from lo through hi-1, inclusive. @@ -297,8 +275,8 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry { ents = append(ents, storedEnts...) } if hi > l.unstable.offset { - firstUnstable := max(lo, l.unstable.offset) - ents = append(ents, l.unstable.entries[firstUnstable-l.unstable.offset:hi-l.unstable.offset]...) + unstable := l.unstable.slice(max(lo, l.unstable.offset), hi) + ents = append(ents, unstable...) } return ents } diff --git a/raft/log_unstable.go b/raft/log_unstable.go new file mode 100644 index 000000000..1c50cddca --- /dev/null +++ b/raft/log_unstable.go @@ -0,0 +1,134 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package raft + +import ( + "log" + + pb "github.com/coreos/etcd/raft/raftpb" +) + +// unstable.entris[i] has raft log position i+unstable.offset. +// Note that unstable.offset may be less than the highest log +// position in storage; this means that the next write to storage +// might need to truncate the log before persisting unstable.entries. +type unstable struct { + // the incoming unstable snapshot, if any. + snapshot *pb.Snapshot + // all entries that have not yet been written to storage. + entries []pb.Entry + offset uint64 +} + +// maybeFirstIndex returns the first index if it has a snapshot. +func (u *unstable) maybeFirstIndex() (uint64, bool) { + if u.snapshot != nil { + return u.snapshot.Metadata.Index, true + } + return 0, false +} + +// maybeLastIndex returns the last index if it has at least one +// unstable entry or snapshot. +func (u *unstable) maybeLastIndex() (uint64, bool) { + if l := len(u.entries); l != 0 { + return u.offset + uint64(l) - 1, true + } + if u.snapshot != nil { + return u.snapshot.Metadata.Index, true + } + return 0, false +} + +// myabeTerm returns the term of the entry at index i, if there +// is any. +func (u *unstable) maybeTerm(i uint64) (uint64, bool) { + if i < u.offset { + if u.snapshot == nil { + return 0, false + } + if u.snapshot.Metadata.Index == i { + return u.snapshot.Metadata.Term, true + } + return 0, false + } + + last, ok := u.maybeLastIndex() + if !ok { + return 0, false + } + if i > last { + return 0, false + } + return u.entries[i-u.offset].Term, true +} + +func (u *unstable) stableTo(i uint64) { + if i < u.offset || i+1-u.offset > uint64(len(u.entries)) { + log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]", + i, u.offset, len(u.entries)) + } + u.entries = u.entries[i+1-u.offset:] + u.offset = i + 1 +} + +func (u *unstable) stableSnapTo(i uint64) { + if u.snapshot != nil && u.snapshot.Metadata.Index == i { + u.snapshot = nil + } +} + +func (u *unstable) restore(s pb.Snapshot) { + u.offset = s.Metadata.Index + 1 + u.entries = nil + u.snapshot = &s +} + +func (u *unstable) resetEntries(offset uint64) { + u.entries = nil + u.offset = offset +} + +func (u *unstable) truncateAndAppend(after uint64, ents []pb.Entry) { + if after < u.offset { + // The log is being truncated to before our current unstable + // portion, so discard it and reset unstable. + u.resetEntries(after + 1) + } + u.entries = append(u.slice(u.offset, after+1), ents...) +} + +func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry { + if lo >= hi { + return nil + } + if u.isOutOfBounds(lo) || u.isOutOfBounds(hi-1) { + return nil + } + return u.entries[lo-u.offset : hi-u.offset] +} + +func (u *unstable) isOutOfBounds(i uint64) bool { + if len(u.entries) == 0 { + return true + } + last := u.offset + uint64(len(u.entries)) - 1 + if i < u.offset || i > last { + return true + } + return false +} diff --git a/raft/node.go b/raft/node.go index 51c2bcb18..b81d77773 100644 --- a/raft/node.go +++ b/raft/node.go @@ -306,9 +306,7 @@ func (n *node) run(r *raft) { r.raftLog.stableTo(prevLastUnstablei) havePrevLastUnstablei = false } - if r.raftLog.unstable.snapshot != nil && r.raftLog.unstable.snapshot.Metadata.Index == prevSnapi { - r.raftLog.unstable.snapshot = nil - } + r.raftLog.stableSnapTo(prevSnapi) advancec = nil case <-n.stop: close(n.done)