From 2faf72f47c2ce1633ed93035473fb7a9ecc771a4 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Thu, 22 Dec 2016 12:03:49 -0800 Subject: [PATCH] etcdserver: rework update committed index logic --- etcdserver/raft.go | 15 +++++++++++++++ etcdserver/server.go | 19 ++++++++----------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 50b574211..eae398cce 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -182,6 +182,8 @@ func (r *raftNode) start(rh *raftReadyHandler) { raftDone: raftDone, } + updateCommittedIndex(&ap, rh) + select { case r.applyc <- ap: case <-r.stopped: @@ -231,6 +233,19 @@ func (r *raftNode) start(rh *raftReadyHandler) { }() } +func updateCommittedIndex(ap *apply, rh *raftReadyHandler) { + var ci uint64 + if len(ap.entries) != 0 { + ci = ap.entries[len(ap.entries)-1].Index + } + if ap.snapshot.Metadata.Index > ci { + ci = ap.snapshot.Metadata.Index + } + if ci != 0 { + rh.updateCommittedIndex(ci) + } +} + func (r *raftNode) sendMessages(ms []raftpb.Message) { sentAppResp := false for i := len(ms) - 1; i >= 0; i-- { diff --git a/etcdserver/server.go b/etcdserver/server.go index 806c0c3de..d28908eee 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -598,7 +598,8 @@ type etcdProgress struct { // and helps decouple state machine logic from Raft algorithms. // TODO: add a state machine interface to apply the commit entries and do snapshot/recover type raftReadyHandler struct { - leadershipUpdate func() + leadershipUpdate func() + updateCommittedIndex func(uint64) } func (s *EtcdServer) run() { @@ -648,6 +649,12 @@ func (s *EtcdServer) run() { s.r.td.Reset() } }, + updateCommittedIndex: func(ci uint64) { + cci := s.getCommittedIndex() + if ci > cci { + s.setCommittedIndex(ci) + } + }, } s.r.start(rh) @@ -701,16 +708,6 @@ func (s *EtcdServer) run() { for { select { case ap := <-s.r.apply(): - var ci uint64 - if len(ap.entries) != 0 { - ci = ap.entries[len(ap.entries)-1].Index - } - if ap.snapshot.Metadata.Index > ci { - ci = ap.snapshot.Metadata.Index - } - if ci != 0 { - s.setCommittedIndex(ci) - } f := func(context.Context) { s.applyAll(&ep, &ap) } sched.Schedule(f) case leases := <-expiredLeaseC: