mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #7513 from gyuho/raft-applied-term
etcdserver: remove possibly compacted entry look-up
This commit is contained in:
commit
5856c8bce9
@ -599,6 +599,7 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
|
||||
type etcdProgress struct {
|
||||
confState raftpb.ConfState
|
||||
snapi uint64
|
||||
appliedt uint64
|
||||
appliedi uint64
|
||||
}
|
||||
|
||||
@ -676,6 +677,7 @@ func (s *EtcdServer) run() {
|
||||
ep := etcdProgress{
|
||||
confState: sn.Metadata.ConfState,
|
||||
snapi: sn.Metadata.Index,
|
||||
appliedt: sn.Metadata.Term,
|
||||
appliedi: sn.Metadata.Index,
|
||||
}
|
||||
|
||||
@ -777,7 +779,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
|
||||
select {
|
||||
// snapshot requested via send()
|
||||
case m := <-s.r.msgSnapC:
|
||||
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
|
||||
merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
|
||||
s.sendMergedSnap(merged)
|
||||
default:
|
||||
}
|
||||
@ -879,6 +881,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
}
|
||||
plog.Info("finished adding peers from new cluster configuration into network...")
|
||||
|
||||
ep.appliedt = apply.snapshot.Metadata.Term
|
||||
ep.appliedi = apply.snapshot.Metadata.Index
|
||||
ep.snapi = ep.appliedi
|
||||
ep.confState = apply.snapshot.Metadata.ConfState
|
||||
@ -900,7 +903,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
|
||||
return
|
||||
}
|
||||
var shouldstop bool
|
||||
if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
|
||||
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
|
||||
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
|
||||
}
|
||||
}
|
||||
@ -1254,9 +1257,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
|
||||
// apply takes entries received from Raft (after it has been committed) and
|
||||
// applies them to the current state of the EtcdServer.
|
||||
// The given entries should not be empty.
|
||||
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) {
|
||||
var applied uint64
|
||||
var shouldstop bool
|
||||
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) {
|
||||
for i := range es {
|
||||
e := es[i]
|
||||
switch e.Type {
|
||||
@ -1266,16 +1267,17 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
|
||||
var cc raftpb.ConfChange
|
||||
pbutil.MustUnmarshal(&cc, e.Data)
|
||||
removedSelf, err := s.applyConfChange(cc, confState)
|
||||
shouldstop = shouldstop || removedSelf
|
||||
shouldStop = shouldStop || removedSelf
|
||||
s.w.Trigger(cc.ID, err)
|
||||
default:
|
||||
plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
|
||||
}
|
||||
atomic.StoreUint64(&s.r.index, e.Index)
|
||||
atomic.StoreUint64(&s.r.term, e.Term)
|
||||
applied = e.Index
|
||||
appliedt = e.Term
|
||||
appliedi = e.Index
|
||||
}
|
||||
return applied, shouldstop
|
||||
return appliedt, appliedi, shouldStop
|
||||
}
|
||||
|
||||
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
|
||||
|
@ -615,7 +615,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
|
||||
ents = append(ents, ent)
|
||||
}
|
||||
|
||||
_, shouldStop := srv.apply(ents, &raftpb.ConfState{})
|
||||
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
|
||||
if !shouldStop {
|
||||
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package etcdserver
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
@ -26,12 +25,7 @@ import (
|
||||
// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
|
||||
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
|
||||
// as ReadCloser.
|
||||
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, confState raftpb.ConfState) snap.Message {
|
||||
snapt, err := s.r.raftStorage.Term(snapi)
|
||||
if err != nil {
|
||||
log.Panicf("get term should never fail: %v", err)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
|
||||
// get a snapshot of v2 store as []byte
|
||||
clone := s.store.Clone()
|
||||
d, err := clone.SaveNoCopy()
|
||||
|
Loading…
x
Reference in New Issue
Block a user