raft: add BenchmarkRawNode

This is a speed-of-light benchmark that uses an in-memory single-voter
RawNode to sequentially propose and process entries.

As a bonus, it also measures the number of calls to the underlying
Storage. Calls to the Storage are cheap since the benchmark is in-
memory, but in a real-world implementation, especially one that doesn't
cache results, additional calls to the Storage interface can translate
to a heavy hit as they might involve additional I/O.

Signed-off-by: Tobias Grieger <tobias.b.grieger@gmail.com>
This commit is contained in:
Tobias Grieger 2022-09-07 10:06:30 +02:00
parent 3ad363d070
commit 5e3314da42
2 changed files with 99 additions and 0 deletions

View File

@ -1105,3 +1105,90 @@ func TestRawNodeConsumeReady(t *testing.T) {
t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs)
}
}
func BenchmarkRawNode(b *testing.B) {
b.Run("single-voter", func(b *testing.B) {
benchmarkRawNodeImpl(b, 1)
})
b.Run("two-voters", func(b *testing.B) {
benchmarkRawNodeImpl(b, 1, 2)
})
}
func benchmarkRawNodeImpl(b *testing.B, peers ...uint64) {
const debug = false
s := newTestMemoryStorage(withPeers(peers...))
cfg := newTestConfig(1, 10, 1, s)
if !debug {
cfg.Logger = discardLogger // avoid distorting benchmark output
}
rn, err := NewRawNode(cfg)
if err != nil {
b.Fatal(err)
}
run := make(chan struct{}, 1)
defer close(run)
var numReady uint64
stabilize := func() (applied uint64) {
for rn.HasReady() {
numReady++
rd := rn.Ready()
if debug {
b.Log(DescribeReady(rd, nil))
}
if n := len(rd.CommittedEntries); n > 0 {
applied = rd.CommittedEntries[n-1].Index
}
s.Append(rd.Entries)
for _, m := range rd.Messages {
if m.Type == pb.MsgVote {
resp := pb.Message{To: m.From, From: m.To, Term: m.Term, Type: pb.MsgVoteResp}
if debug {
b.Log(DescribeMessage(resp, nil))
}
rn.Step(resp)
}
if m.Type == pb.MsgApp {
idx := m.Index
if n := len(m.Entries); n > 0 {
idx = m.Entries[n-1].Index
}
resp := pb.Message{To: m.From, From: m.To, Type: pb.MsgAppResp, Term: m.Term, Index: idx}
if debug {
b.Log(DescribeMessage(resp, nil))
}
rn.Step(resp)
}
}
rn.Advance(rd)
}
return applied
}
rn.Campaign()
stabilize()
if debug {
b.N = 1
}
var applied uint64
for i := 0; i < b.N; i++ {
if err := rn.Propose([]byte("foo")); err != nil {
b.Fatal(err)
}
applied = stabilize()
}
if applied < uint64(b.N) {
b.Fatalf("did not apply everything: %d < %d", applied, b.N)
}
b.ReportMetric(float64(s.callStats.firstIndex)/float64(b.N), "firstIndex/op")
b.ReportMetric(float64(s.callStats.lastIndex)/float64(b.N), "lastIndex/op")
b.ReportMetric(float64(s.callStats.term)/float64(b.N), "term/op")
b.ReportMetric(float64(numReady)/float64(b.N), "ready/op")
b.Logf("storage access stats: %+v", s.callStats)
}

View File

@ -71,6 +71,10 @@ type Storage interface {
Snapshot() (pb.Snapshot, error)
}
type inMemStorageCallStats struct {
initialState, firstIndex, lastIndex, entries, term, snapshot int
}
// MemoryStorage implements the Storage interface backed by an
// in-memory array.
type MemoryStorage struct {
@ -83,6 +87,8 @@ type MemoryStorage struct {
snapshot pb.Snapshot
// ents[i] has raft log position i+snapshot.Metadata.Index
ents []pb.Entry
callStats inMemStorageCallStats
}
// NewMemoryStorage creates an empty MemoryStorage.
@ -95,6 +101,7 @@ func NewMemoryStorage() *MemoryStorage {
// InitialState implements the Storage interface.
func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error) {
ms.callStats.initialState++
return ms.hardState, ms.snapshot.Metadata.ConfState, nil
}
@ -110,6 +117,7 @@ func (ms *MemoryStorage) SetHardState(st pb.HardState) error {
func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) {
ms.Lock()
defer ms.Unlock()
ms.callStats.entries++
offset := ms.ents[0].Index
if lo <= offset {
return nil, ErrCompacted
@ -130,6 +138,7 @@ func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) {
func (ms *MemoryStorage) Term(i uint64) (uint64, error) {
ms.Lock()
defer ms.Unlock()
ms.callStats.term++
offset := ms.ents[0].Index
if i < offset {
return 0, ErrCompacted
@ -144,6 +153,7 @@ func (ms *MemoryStorage) Term(i uint64) (uint64, error) {
func (ms *MemoryStorage) LastIndex() (uint64, error) {
ms.Lock()
defer ms.Unlock()
ms.callStats.lastIndex++
return ms.lastIndex(), nil
}
@ -155,6 +165,7 @@ func (ms *MemoryStorage) lastIndex() uint64 {
func (ms *MemoryStorage) FirstIndex() (uint64, error) {
ms.Lock()
defer ms.Unlock()
ms.callStats.firstIndex++
return ms.firstIndex(), nil
}
@ -166,6 +177,7 @@ func (ms *MemoryStorage) firstIndex() uint64 {
func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) {
ms.Lock()
defer ms.Unlock()
ms.callStats.snapshot++
return ms.snapshot, nil
}