diff --git a/raft/log.go b/raft/log.go index bd7edea5b..4da815a49 100644 --- a/raft/log.go +++ b/raft/log.go @@ -136,7 +136,7 @@ func (l *raftLog) unstableEntries() []pb.Entry { func (l *raftLog) nextEnts() (ents []pb.Entry) { off := max(l.applied+1, l.firstIndex()) if l.committed+1 > off { - return l.slice(off, l.committed+1) + return l.slice(off, l.committed+1, noLimit) } return nil } @@ -217,15 +217,15 @@ func (l *raftLog) term(i uint64) uint64 { panic(err) // TODO(bdarnell) } -func (l *raftLog) entries(i uint64) []pb.Entry { +func (l *raftLog) entries(i, maxsize uint64) []pb.Entry { if i > l.lastIndex() { return nil } - return l.slice(i, l.lastIndex()+1) + return l.slice(i, l.lastIndex()+1, maxsize) } // allEntries returns all entries in the log. -func (l *raftLog) allEntries() []pb.Entry { return l.entries(l.firstIndex()) } +func (l *raftLog) allEntries() []pb.Entry { return l.entries(l.firstIndex(), noLimit) } // isUpToDate determines if the given (lastIndex,term) log is more up-to-date // by comparing the index and term of the last entries in the existing logs. @@ -254,14 +254,14 @@ func (l *raftLog) restore(s pb.Snapshot) { } // slice returns a slice of log entries from lo through hi-1, inclusive. -func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry { +func (l *raftLog) slice(lo, hi, maxSize uint64) []pb.Entry { l.mustCheckOutOfBounds(lo, hi) if lo == hi { return nil } var ents []pb.Entry if lo < l.unstable.offset { - storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset)) + storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize) if err == ErrCompacted { // This should never fail because it has been checked before. raftLogger.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable.offset)) @@ -270,6 +270,12 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry { } else if err != nil { panic(err) // TODO(bdarnell) } + + // check if ents has reached the size limitation + if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo { + return storedEnts + } + ents = storedEnts } if hi > l.unstable.offset { @@ -281,7 +287,7 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry { ents = unstable } } - return ents + return limitSize(ents, maxSize) } // l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries) diff --git a/raft/log_test.go b/raft/log_test.go index 3aef304bf..88fe6ae61 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -132,7 +132,7 @@ func TestAppend(t *testing.T) { if index != tt.windex { t.Errorf("#%d: lastIndex = %d, want %d", i, index, tt.windex) } - if g := raftLog.entries(1); !reflect.DeepEqual(g, tt.wents) { + if g := raftLog.entries(1, noLimit); !reflect.DeepEqual(g, tt.wents) { t.Errorf("#%d: logEnts = %+v, want %+v", i, g, tt.wents) } if g := raftLog.unstable.offset; g != tt.wunstable { @@ -257,7 +257,7 @@ func TestLogMaybeAppend(t *testing.T) { t.Errorf("#%d: committed = %d, want %d", i, gcommit, tt.wcommit) } if gappend && len(tt.ents) != 0 { - gents := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.ents))+1, raftLog.lastIndex()+1) + gents := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.ents))+1, raftLog.lastIndex()+1, noLimit) if !reflect.DeepEqual(tt.ents, gents) { t.Errorf("%d: appended entries = %v, want %v", i, gents, tt.ents) } @@ -322,7 +322,7 @@ func TestCompactionSideEffects(t *testing.T) { t.Errorf("lastIndex = %d, want = %d", raftLog.lastIndex(), prev+1) } - ents := raftLog.entries(raftLog.lastIndex()) + ents := raftLog.entries(raftLog.lastIndex(), noLimit) if len(ents) != 1 { t.Errorf("len(entries) = %d, want = %d", len(ents), 1) } @@ -691,25 +691,43 @@ func TestSlice(t *testing.T) { var i uint64 offset := uint64(100) num := uint64(100) + last := offset + num + half := offset + num/2 + halfe := pb.Entry{Index: half, Term: half} storage := NewMemoryStorage() storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}}) + for i = 1; i < num/2; i++ { + storage.Append([]pb.Entry{{Index: offset + i, Term: offset + i}}) + } l := newLog(storage) - for i = 1; i < num; i++ { + for i = num / 2; i < num; i++ { l.append(pb.Entry{Index: offset + i, Term: offset + i}) } tests := []struct { - from uint64 - to uint64 + from uint64 + to uint64 + limit uint64 + w []pb.Entry wpanic bool }{ - {offset - 1, offset + 1, nil, true}, - {offset, offset + 1, nil, true}, - {offset + num/2, offset + num/2 + 1, []pb.Entry{{Index: offset + num/2, Term: offset + num/2}}, false}, - {offset + num - 1, offset + num, []pb.Entry{{Index: offset + num - 1, Term: offset + num - 1}}, false}, - {offset + num, offset + num + 1, nil, true}, + // test no limit + {offset - 1, offset + 1, noLimit, nil, true}, + {offset, offset + 1, noLimit, nil, true}, + {half - 1, half + 1, noLimit, []pb.Entry{{Index: half - 1, Term: half - 1}, {Index: half, Term: half}}, false}, + {half, half + 1, noLimit, []pb.Entry{{Index: half, Term: half}}, false}, + {last - 1, last, noLimit, []pb.Entry{{Index: last - 1, Term: last - 1}}, false}, + {last, last + 1, noLimit, nil, true}, + + // test limit + {half - 1, half + 1, 0, []pb.Entry{{Index: half - 1, Term: half - 1}}, false}, + {half - 1, half + 1, uint64(halfe.Size() + 1), []pb.Entry{{Index: half - 1, Term: half - 1}}, false}, + {half - 1, half + 1, uint64(halfe.Size() * 2), []pb.Entry{{Index: half - 1, Term: half - 1}, {Index: half, Term: half}}, false}, + {half - 1, half + 2, uint64(halfe.Size() * 3), []pb.Entry{{Index: half - 1, Term: half - 1}, {Index: half, Term: half}, {Index: half + 1, Term: half + 1}}, false}, + {half, half + 2, uint64(halfe.Size()), []pb.Entry{{Index: half, Term: half}}, false}, + {half, half + 2, uint64(halfe.Size() * 2), []pb.Entry{{Index: half, Term: half}, {Index: half + 1, Term: half + 1}}, false}, } for j, tt := range tests { @@ -721,7 +739,7 @@ func TestSlice(t *testing.T) { } } }() - g := l.slice(tt.from, tt.to) + g := l.slice(tt.from, tt.to, tt.limit) if !reflect.DeepEqual(g, tt.w) { t.Errorf("#%d: from %d to %d = %v, want %v", j, tt.from, tt.to, g, tt.w) } diff --git a/raft/multinode_test.go b/raft/multinode_test.go index 2db3195d9..f5ab35683 100644 --- a/raft/multinode_test.go +++ b/raft/multinode_test.go @@ -140,7 +140,7 @@ func TestMultiNodePropose(t *testing.T) { if err != nil { t.Fatal(err) } - entries, err := s.Entries(lastIndex, lastIndex+1) + entries, err := s.Entries(lastIndex, lastIndex+1, noLimit) if err != nil { t.Fatal(err) } @@ -191,7 +191,7 @@ func TestMultiNodeProposeConfig(t *testing.T) { } mn.Stop() - entries, err := s.Entries(lastIndex, lastIndex+1) + entries, err := s.Entries(lastIndex, lastIndex+1, noLimit) if err != nil { t.Fatal(err) } diff --git a/raft/raft.go b/raft/raft.go index f74a9a640..d1e86ea71 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -17,6 +17,7 @@ package raft import ( "errors" "fmt" + "math" "math/rand" "sort" "strings" @@ -26,6 +27,7 @@ import ( // None is a placeholder node ID used when there is no leader. const None uint64 = 0 +const noLimit = math.MaxUint64 var errNoLeader = errors.New("no leader") @@ -189,7 +191,8 @@ type raft struct { // the log raftLog *raftLog - prs map[uint64]*Progress + maxMsgSize uint64 + prs map[uint64]*Progress state StateType @@ -231,9 +234,13 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage peers = cs.Nodes } r := &raft{ - id: id, - lead: None, - raftLog: raftlog, + id: id, + lead: None, + raftLog: raftlog, + // 4MB for now and hard code it + // TODO(xiang): add a config arguement into newRaft after we add + // the max inflight message field. + maxMsgSize: 4 * 1024 * 1024, prs: make(map[uint64]*Progress), electionTimeout: election, heartbeatTimeout: heartbeat, @@ -314,7 +321,7 @@ func (r *raft) sendAppend(to uint64) { m.Type = pb.MsgApp m.Index = pr.Next - 1 m.LogTerm = r.raftLog.term(pr.Next - 1) - m.Entries = r.raftLog.entries(pr.Next) + m.Entries = r.raftLog.entries(pr.Next, r.maxMsgSize) m.Commit = r.raftLog.committed if n := len(m.Entries); n != 0 { switch pr.State { @@ -463,7 +470,7 @@ func (r *raft) becomeLeader() { r.tick = r.tickHeartbeat r.lead = r.id r.state = StateLeader - for _, e := range r.raftLog.entries(r.raftLog.committed + 1) { + for _, e := range r.raftLog.entries(r.raftLog.committed+1, noLimit) { if e.Type != pb.EntryConfChange { continue } diff --git a/raft/raft_test.go b/raft/raft_test.go index 3e4c78b84..ed552e31e 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1629,7 +1629,7 @@ func TestStepIgnoreConfig(t *testing.T) { pendingConf := r.pendingConf r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}} - if ents := r.raftLog.entries(index + 1); !reflect.DeepEqual(ents, wents) { + if ents := r.raftLog.entries(index+1, noLimit); !reflect.DeepEqual(ents, wents) { t.Errorf("ents = %+v, want %+v", ents, wents) } if r.pendingConf != pendingConf { diff --git a/raft/storage.go b/raft/storage.go index c4cfe7f20..d57cb7cab 100644 --- a/raft/storage.go +++ b/raft/storage.go @@ -41,7 +41,9 @@ type Storage interface { // InitialState returns the saved HardState and ConfState information. InitialState() (pb.HardState, pb.ConfState, error) // Entries returns a slice of log entries in the range [lo,hi). - Entries(lo, hi uint64) ([]pb.Entry, error) + // MaxSize limits the total size of the log entries returned, but + // Entries returns at least one entry if any. + Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) // Term returns the term of entry i, which must be in the range // [FirstIndex()-1, LastIndex()]. The term of the entry before // FirstIndex is retained for matching purposes even though the @@ -92,7 +94,7 @@ func (ms *MemoryStorage) SetHardState(st pb.HardState) error { } // Entries implements the Storage interface. -func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) { +func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) { ms.Lock() defer ms.Unlock() offset := ms.ents[0].Index @@ -106,7 +108,9 @@ func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) { if len(ms.ents) == 1 { return nil, ErrUnavailable } - return ms.ents[lo-offset : hi-offset], nil + + ents := ms.ents[lo-offset : hi-offset] + return limitSize(ents, maxSize), nil } // Term implements the Storage interface. diff --git a/raft/storage_test.go b/raft/storage_test.go index 8e235d125..d782b0320 100644 --- a/raft/storage_test.go +++ b/raft/storage_test.go @@ -15,6 +15,7 @@ package raft import ( + "math" "reflect" "testing" @@ -50,22 +51,32 @@ func TestStorageTerm(t *testing.T) { } func TestStorageEntries(t *testing.T) { - ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}} + ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}} tests := []struct { - lo, hi uint64 + lo, hi, maxsize uint64 werr error wentries []pb.Entry }{ - {2, 6, ErrCompacted, nil}, - {3, 4, ErrCompacted, nil}, - {4, 5, nil, []pb.Entry{{Index: 4, Term: 4}}}, - {4, 6, nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}}, + {2, 6, math.MaxUint64, ErrCompacted, nil}, + {3, 4, math.MaxUint64, ErrCompacted, nil}, + {4, 5, math.MaxUint64, nil, []pb.Entry{{Index: 4, Term: 4}}}, + {4, 6, math.MaxUint64, nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}}, + {4, 7, math.MaxUint64, nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}}}, + // even if maxsize is zero, the first entry should be returned + {4, 7, 0, nil, []pb.Entry{{Index: 4, Term: 4}}}, + // limit to 2 + {4, 7, uint64(ents[1].Size() + ents[2].Size()), nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}}, + // limit to 2 + {4, 7, uint64(ents[1].Size() + ents[2].Size() + ents[3].Size()/2), nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}}, + {4, 7, uint64(ents[1].Size() + ents[2].Size() + ents[3].Size() - 1), nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}}, + // all + {4, 7, uint64(ents[1].Size() + ents[2].Size() + ents[3].Size()), nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}}}, } for i, tt := range tests { s := &MemoryStorage{ents: ents} - entries, err := s.Entries(tt.lo, tt.hi) + entries, err := s.Entries(tt.lo, tt.hi, tt.maxsize) if err != tt.werr { t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) } diff --git a/raft/util.go b/raft/util.go index 6e512fd4d..21e986edc 100644 --- a/raft/util.go +++ b/raft/util.go @@ -93,3 +93,18 @@ func DescribeEntry(e pb.Entry, f EntryFormatter) string { } return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted) } + +func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry { + if len(ents) == 0 { + return ents + } + size := ents[0].Size() + var limit int + for limit = 1; limit < len(ents); limit++ { + size += ents[limit].Size() + if uint64(size) > maxSize { + break + } + } + return ents[:limit] +} diff --git a/raft/util_test.go b/raft/util_test.go index f2849a921..920879df9 100644 --- a/raft/util_test.go +++ b/raft/util_test.go @@ -15,6 +15,8 @@ package raft import ( + "math" + "reflect" "strings" "testing" @@ -43,3 +45,28 @@ func TestDescribeEntry(t *testing.T) { t.Errorf("unexpected custom output: %s", customFormatted) } } + +func TestLimitSize(t *testing.T) { + ents := []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}} + tests := []struct { + maxsize uint64 + wentries []pb.Entry + }{ + {math.MaxUint64, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}}}, + // even if maxsize is zero, the first entry should be returned + {0, []pb.Entry{{Index: 4, Term: 4}}}, + // limit to 2 + {uint64(ents[0].Size() + ents[1].Size()), []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}}, + // limit to 2 + {uint64(ents[0].Size() + ents[1].Size() + ents[2].Size()/2), []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}}, + {uint64(ents[0].Size() + ents[1].Size() + ents[2].Size() - 1), []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}}, + // all + {uint64(ents[0].Size() + ents[1].Size() + ents[2].Size()), []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}}}, + } + + for i, tt := range tests { + if !reflect.DeepEqual(limitSize(ents, tt.maxsize), tt.wentries) { + t.Errorf("#%d: entries = %v, want %v", i, limitSize(ents, tt.maxsize), tt.wentries) + } + } +}