mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: rename raftLog.nextEnts to raftLog.nextCommittedEnts
Also rename hasNextEnts to hasNextCommittedEnts. Also rename maxNextEntsSize to maxNextCommittedEntsSize. Pure refactor. Signed-off-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
This commit is contained in:
parent
0bff3ade4d
commit
3711fde822
26
raft/log.go
26
raft/log.go
@ -39,9 +39,9 @@ type raftLog struct {
|
||||
|
||||
logger Logger
|
||||
|
||||
// maxNextEntsSize is the maximum number aggregate byte size of the messages
|
||||
// returned from calls to nextEnts.
|
||||
maxNextEntsSize uint64
|
||||
// maxNextCommittedEntsSize is the maximum number aggregate byte size of the
|
||||
// messages returned from calls to nextCommittedEnts.
|
||||
maxNextCommittedEntsSize uint64
|
||||
}
|
||||
|
||||
// newLog returns log using the given storage and default options. It
|
||||
@ -53,14 +53,14 @@ func newLog(storage Storage, logger Logger) *raftLog {
|
||||
|
||||
// newLogWithSize returns a log using the given storage and max
|
||||
// message size.
|
||||
func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog {
|
||||
func newLogWithSize(storage Storage, logger Logger, maxNextCommittedEntsSize uint64) *raftLog {
|
||||
if storage == nil {
|
||||
log.Panic("storage must not be nil")
|
||||
}
|
||||
log := &raftLog{
|
||||
storage: storage,
|
||||
logger: logger,
|
||||
maxNextEntsSize: maxNextEntsSize,
|
||||
storage: storage,
|
||||
logger: logger,
|
||||
maxNextCommittedEntsSize: maxNextCommittedEntsSize,
|
||||
}
|
||||
firstIndex, err := storage.FirstIndex()
|
||||
if err != nil {
|
||||
@ -177,13 +177,13 @@ func (l *raftLog) unstableEntries() []pb.Entry {
|
||||
return l.unstable.entries
|
||||
}
|
||||
|
||||
// nextEnts returns all the available entries for execution.
|
||||
// nextCommittedEnts returns all the available entries for execution.
|
||||
// If applied is smaller than the index of snapshot, it returns all committed
|
||||
// entries after the index of snapshot.
|
||||
func (l *raftLog) nextEnts() (ents []pb.Entry) {
|
||||
func (l *raftLog) nextCommittedEnts() (ents []pb.Entry) {
|
||||
off := max(l.applied+1, l.firstIndex())
|
||||
if l.committed+1 > off {
|
||||
ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize)
|
||||
ents, err := l.slice(off, l.committed+1, l.maxNextCommittedEntsSize)
|
||||
if err != nil {
|
||||
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
|
||||
}
|
||||
@ -192,9 +192,9 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// hasNextEnts returns if there is any available entries for execution. This
|
||||
// is a fast check without heavy raftLog.slice() in raftLog.nextEnts().
|
||||
func (l *raftLog) hasNextEnts() bool {
|
||||
// hasNextCommittedEnts returns if there is any available entries for execution.
|
||||
// This is a fast check without heavy raftLog.slice() in nextCommittedEnts().
|
||||
func (l *raftLog) hasNextCommittedEnts() bool {
|
||||
off := max(l.applied+1, l.firstIndex())
|
||||
return l.committed+1 > off
|
||||
}
|
||||
|
@ -339,7 +339,7 @@ func TestCompactionSideEffects(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHasNextEnts(t *testing.T) {
|
||||
func TestHasNextCommittedEnts(t *testing.T) {
|
||||
snap := pb.Snapshot{
|
||||
Metadata: pb.SnapshotMetadata{Term: 1, Index: 3},
|
||||
}
|
||||
@ -365,14 +365,14 @@ func TestHasNextEnts(t *testing.T) {
|
||||
raftLog.maybeCommit(5, 1)
|
||||
raftLog.appliedTo(tt.applied)
|
||||
|
||||
hasNext := raftLog.hasNextEnts()
|
||||
hasNext := raftLog.hasNextCommittedEnts()
|
||||
if hasNext != tt.hasNext {
|
||||
t.Errorf("#%d: hasNext = %v, want %v", i, hasNext, tt.hasNext)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNextEnts(t *testing.T) {
|
||||
func TestNextCommittedEnts(t *testing.T) {
|
||||
snap := pb.Snapshot{
|
||||
Metadata: pb.SnapshotMetadata{Term: 1, Index: 3},
|
||||
}
|
||||
@ -398,7 +398,7 @@ func TestNextEnts(t *testing.T) {
|
||||
raftLog.maybeCommit(5, 1)
|
||||
raftLog.appliedTo(tt.applied)
|
||||
|
||||
nents := raftLog.nextEnts()
|
||||
nents := raftLog.nextCommittedEnts()
|
||||
if !reflect.DeepEqual(nents, tt.wents) {
|
||||
t.Errorf("#%d: nents = %+v, want %+v", i, nents, tt.wents)
|
||||
}
|
||||
|
@ -568,7 +568,7 @@ func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
|
||||
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
|
||||
rd := Ready{
|
||||
Entries: r.raftLog.unstableEntries(),
|
||||
CommittedEntries: r.raftLog.nextEnts(),
|
||||
CommittedEntries: r.raftLog.nextCommittedEnts(),
|
||||
Messages: r.msgs,
|
||||
}
|
||||
if softSt := r.softState(); !softSt.equal(prevSoftSt) {
|
||||
|
@ -952,8 +952,8 @@ func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, maxSize uint64) ([]raf
|
||||
// internal one. The original bug was the following:
|
||||
//
|
||||
// - node learns that index 11 (or 100, doesn't matter) is committed
|
||||
// - nextEnts returns index 1..10 in CommittedEntries due to size limiting. However,
|
||||
// index 10 already exceeds maxBytes, due to a user-provided impl of Entries.
|
||||
// - nextCommittedEnts returns index 1..10 in CommittedEntries due to size limiting.
|
||||
// However, index 10 already exceeds maxBytes, due to a user-provided impl of Entries.
|
||||
// - Commit index gets bumped to 10
|
||||
// - the node persists the HardState, but crashes before applying the entries
|
||||
// - upon restart, the storage returns the same entries, but `slice` takes a different code path
|
||||
|
@ -450,8 +450,8 @@ func TestLeaderCommitEntry(t *testing.T) {
|
||||
t.Errorf("committed = %d, want %d", g, li+1)
|
||||
}
|
||||
wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
|
||||
if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
|
||||
t.Errorf("nextEnts = %+v, want %+v", g, wents)
|
||||
if g := r.raftLog.nextCommittedEnts(); !reflect.DeepEqual(g, wents) {
|
||||
t.Errorf("nextCommittedEnts = %+v, want %+v", g, wents)
|
||||
}
|
||||
msgs := r.readMessages()
|
||||
sort.Sort(messageSlice(msgs))
|
||||
@ -538,7 +538,7 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) {
|
||||
|
||||
li := uint64(len(tt))
|
||||
wents := append(tt, pb.Entry{Term: 3, Index: li + 1}, pb.Entry{Term: 3, Index: li + 2, Data: []byte("some data")})
|
||||
if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
|
||||
if g := r.raftLog.nextCommittedEnts(); !reflect.DeepEqual(g, wents) {
|
||||
t.Errorf("#%d: ents = %+v, want %+v", i, g, wents)
|
||||
}
|
||||
}
|
||||
@ -590,8 +590,8 @@ func TestFollowerCommitEntry(t *testing.T) {
|
||||
t.Errorf("#%d: committed = %d, want %d", i, g, tt.commit)
|
||||
}
|
||||
wents := tt.ents[:int(tt.commit)]
|
||||
if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
|
||||
t.Errorf("#%d: nextEnts = %v, want %v", i, g, wents)
|
||||
if g := r.raftLog.nextCommittedEnts(); !reflect.DeepEqual(g, wents) {
|
||||
t.Errorf("#%d: nextCommittedEnts = %v, want %v", i, g, wents)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -163,7 +163,7 @@ func (rn *RawNode) HasReady() bool {
|
||||
if r.raftLog.hasPendingSnapshot() {
|
||||
return true
|
||||
}
|
||||
if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
|
||||
if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextCommittedEnts() {
|
||||
return true
|
||||
}
|
||||
if len(r.readStates) != 0 {
|
||||
|
@ -885,8 +885,8 @@ func TestRawNodeStatus(t *testing.T) {
|
||||
// Raft group would forget to apply entries:
|
||||
//
|
||||
// - node learns that index 11 is committed
|
||||
// - nextEnts returns index 1..10 in CommittedEntries (but index 10 already
|
||||
// exceeds maxBytes), which isn't noticed internally by Raft
|
||||
// - nextCommittedEnts returns index 1..10 in CommittedEntries (but index 10
|
||||
// already exceeds maxBytes), which isn't noticed internally by Raft
|
||||
// - Commit index gets bumped to 10
|
||||
// - the node persists the HardState, but crashes before applying the entries
|
||||
// - upon restart, the storage returns the same entries, but `slice` takes a
|
||||
|
Loading…
x
Reference in New Issue
Block a user