From f18ae033a7e7b69fb351afb4d2a6a24b7bbd4aa1 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Wed, 22 Mar 2017 13:59:02 -0700 Subject: [PATCH] raft: use rs.req.Entries[0].Data as the key for deletion in advance() advance() should use rs.req.Entries[0].Data as the context instead of req.Context for deletion. Since req.Context is never set, there won't be any context being deleted from pendingReadIndex; results mem leak. FIXES #7571 --- raft/raft_test.go | 49 +++++++++++++++++++++++++++++++++++++++++++++++ raft/read_only.go | 2 +- 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/raft/raft_test.go b/raft/raft_test.go index 56b364c8c..2dffe7acc 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1246,6 +1246,55 @@ func TestHandleHeartbeatResp(t *testing.T) { } } +// TestRaftFreesReadOnlyMem ensures raft will free read request from +// readOnly readIndexQueue and pendingReadIndex map. +// related issue: https://github.com/coreos/etcd/issues/7571 +func TestRaftFreesReadOnlyMem(t *testing.T) { + sm := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) + sm.becomeCandidate() + sm.becomeLeader() + sm.raftLog.commitTo(sm.raftLog.lastIndex()) + + ctx := []byte("ctx") + + // leader starts linearizable read request. + // more info: raft dissertation 6.4, step 2. + sm.Step(pb.Message{From: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}}) + msgs := sm.readMessages() + if len(msgs) != 1 { + t.Fatalf("len(msgs) = %d, want 1", len(msgs)) + } + if msgs[0].Type != pb.MsgHeartbeat { + t.Fatalf("type = %v, want MsgHeartbeat", msgs[0].Type) + } + if !bytes.Equal(msgs[0].Context, ctx) { + t.Fatalf("Context = %v, want %v", msgs[0].Context, ctx) + } + if len(sm.readOnly.readIndexQueue) != 1 { + t.Fatalf("len(readIndexQueue) = %v, want 1", len(sm.readOnly.readIndexQueue)) + } + if len(sm.readOnly.pendingReadIndex) != 1 { + t.Fatalf("len(pendingReadIndex) = %v, want 1", len(sm.readOnly.pendingReadIndex)) + } + if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; !ok { + t.Fatalf("can't find context %v in pendingReadIndex ", ctx) + } + + // heartbeat responses from majority of followers (1 in this case) + // acknowledge the authority of the leader. + // more info: raft dissertation 6.4, step 3. + sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Context: ctx}) + if len(sm.readOnly.readIndexQueue) != 0 { + t.Fatalf("len(readIndexQueue) = %v, want 0", len(sm.readOnly.readIndexQueue)) + } + if len(sm.readOnly.pendingReadIndex) != 0 { + t.Fatalf("len(pendingReadIndex) = %v, want 0", len(sm.readOnly.pendingReadIndex)) + } + if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; ok { + t.Fatalf("found context %v in pendingReadIndex, want none", ctx) + } +} + // TestMsgAppRespWaitReset verifies the resume behavior of a leader // MsgAppResp. func TestMsgAppRespWaitReset(t *testing.T) { diff --git a/raft/read_only.go b/raft/read_only.go index 05a21dabd..d0085237e 100644 --- a/raft/read_only.go +++ b/raft/read_only.go @@ -100,7 +100,7 @@ func (ro *readOnly) advance(m pb.Message) []*readIndexStatus { if found { ro.readIndexQueue = ro.readIndexQueue[i:] for _, rs := range rss { - delete(ro.pendingReadIndex, string(rs.req.Context)) + delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data)) } return rss }