From 5e3314da42377b50d390f87c9c908130238d0fae Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 7 Sep 2022 10:06:30 +0200 Subject: [PATCH] raft: add BenchmarkRawNode This is a speed-of-light benchmark that uses an in-memory single-voter RawNode to sequentially propose and process entries. As a bonus, it also measures the number of calls to the underlying Storage. Calls to the Storage are cheap since the benchmark is in- memory, but in a real-world implementation, especially one that doesn't cache results, additional calls to the Storage interface can translate to a heavy hit as they might involve additional I/O. Signed-off-by: Tobias Grieger --- raft/rawnode_test.go | 87 ++++++++++++++++++++++++++++++++++++++++++++ raft/storage.go | 12 ++++++ 2 files changed, 99 insertions(+) diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 535152e14..413f232df 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -1105,3 +1105,90 @@ func TestRawNodeConsumeReady(t *testing.T) { t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs) } } + +func BenchmarkRawNode(b *testing.B) { + b.Run("single-voter", func(b *testing.B) { + benchmarkRawNodeImpl(b, 1) + }) + b.Run("two-voters", func(b *testing.B) { + benchmarkRawNodeImpl(b, 1, 2) + }) +} + +func benchmarkRawNodeImpl(b *testing.B, peers ...uint64) { + + const debug = false + + s := newTestMemoryStorage(withPeers(peers...)) + cfg := newTestConfig(1, 10, 1, s) + if !debug { + cfg.Logger = discardLogger // avoid distorting benchmark output + } + rn, err := NewRawNode(cfg) + if err != nil { + b.Fatal(err) + } + + run := make(chan struct{}, 1) + defer close(run) + + var numReady uint64 + stabilize := func() (applied uint64) { + for rn.HasReady() { + numReady++ + rd := rn.Ready() + if debug { + b.Log(DescribeReady(rd, nil)) + } + if n := len(rd.CommittedEntries); n > 0 { + applied = rd.CommittedEntries[n-1].Index + } + s.Append(rd.Entries) + for _, m := range rd.Messages { + if m.Type == pb.MsgVote { + resp := pb.Message{To: m.From, From: m.To, Term: m.Term, Type: pb.MsgVoteResp} + if debug { + b.Log(DescribeMessage(resp, nil)) + } + rn.Step(resp) + } + if m.Type == pb.MsgApp { + idx := m.Index + if n := len(m.Entries); n > 0 { + idx = m.Entries[n-1].Index + } + resp := pb.Message{To: m.From, From: m.To, Type: pb.MsgAppResp, Term: m.Term, Index: idx} + if debug { + b.Log(DescribeMessage(resp, nil)) + } + rn.Step(resp) + } + } + rn.Advance(rd) + } + return applied + } + + rn.Campaign() + stabilize() + + if debug { + b.N = 1 + } + + var applied uint64 + for i := 0; i < b.N; i++ { + if err := rn.Propose([]byte("foo")); err != nil { + b.Fatal(err) + } + applied = stabilize() + } + if applied < uint64(b.N) { + b.Fatalf("did not apply everything: %d < %d", applied, b.N) + } + b.ReportMetric(float64(s.callStats.firstIndex)/float64(b.N), "firstIndex/op") + b.ReportMetric(float64(s.callStats.lastIndex)/float64(b.N), "lastIndex/op") + b.ReportMetric(float64(s.callStats.term)/float64(b.N), "term/op") + b.ReportMetric(float64(numReady)/float64(b.N), "ready/op") + b.Logf("storage access stats: %+v", s.callStats) +} diff --git a/raft/storage.go b/raft/storage.go index 8b16d4fa2..67ec16b13 100644 --- a/raft/storage.go +++ b/raft/storage.go @@ -71,6 +71,10 @@ type Storage interface { Snapshot() (pb.Snapshot, error) } +type inMemStorageCallStats struct { + initialState, firstIndex, lastIndex, entries, term, snapshot int +} + // MemoryStorage implements the Storage interface backed by an // in-memory array. type MemoryStorage struct { @@ -83,6 +87,8 @@ type MemoryStorage struct { snapshot pb.Snapshot // ents[i] has raft log position i+snapshot.Metadata.Index ents []pb.Entry + + callStats inMemStorageCallStats } // NewMemoryStorage creates an empty MemoryStorage. @@ -95,6 +101,7 @@ func NewMemoryStorage() *MemoryStorage { // InitialState implements the Storage interface. func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error) { + ms.callStats.initialState++ return ms.hardState, ms.snapshot.Metadata.ConfState, nil } @@ -110,6 +117,7 @@ func (ms *MemoryStorage) SetHardState(st pb.HardState) error { func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) { ms.Lock() defer ms.Unlock() + ms.callStats.entries++ offset := ms.ents[0].Index if lo <= offset { return nil, ErrCompacted @@ -130,6 +138,7 @@ func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) { func (ms *MemoryStorage) Term(i uint64) (uint64, error) { ms.Lock() defer ms.Unlock() + ms.callStats.term++ offset := ms.ents[0].Index if i < offset { return 0, ErrCompacted @@ -144,6 +153,7 @@ func (ms *MemoryStorage) Term(i uint64) (uint64, error) { func (ms *MemoryStorage) LastIndex() (uint64, error) { ms.Lock() defer ms.Unlock() + ms.callStats.lastIndex++ return ms.lastIndex(), nil } @@ -155,6 +165,7 @@ func (ms *MemoryStorage) lastIndex() uint64 { func (ms *MemoryStorage) FirstIndex() (uint64, error) { ms.Lock() defer ms.Unlock() + ms.callStats.firstIndex++ return ms.firstIndex(), nil } @@ -166,6 +177,7 @@ func (ms *MemoryStorage) firstIndex() uint64 { func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) { ms.Lock() defer ms.Unlock() + ms.callStats.snapshot++ return ms.snapshot, nil }