mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #1895 from xiang90/snap_nodes
etcd: update conf when apply the confChange entry
This commit is contained in:
commit
e4c0f5c1a8
@ -395,7 +395,7 @@ func (s *EtcdServer) run() {
|
||||
// snapi indicates the index of the last submitted snapshot request
|
||||
snapi := snap.Metadata.Index
|
||||
appliedi := snap.Metadata.Index
|
||||
nodes := snap.Metadata.ConfState.Nodes
|
||||
confState := snap.Metadata.ConfState
|
||||
|
||||
defer func() {
|
||||
s.node.Stop()
|
||||
@ -412,7 +412,6 @@ func (s *EtcdServer) run() {
|
||||
case rd := <-s.node.Ready():
|
||||
if rd.SoftState != nil {
|
||||
atomic.StoreUint64(&s.raftLead, rd.SoftState.Lead)
|
||||
nodes = rd.SoftState.Nodes
|
||||
if rd.RaftState == raft.StateLeader {
|
||||
syncC = s.SyncTicker
|
||||
} else {
|
||||
@ -459,7 +458,7 @@ func (s *EtcdServer) run() {
|
||||
ents = rd.CommittedEntries[appliedi+1-firsti:]
|
||||
}
|
||||
if len(ents) > 0 {
|
||||
if appliedi, shouldstop = s.apply(ents); shouldstop {
|
||||
if appliedi, shouldstop = s.apply(ents, &confState); shouldstop {
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -469,7 +468,7 @@ func (s *EtcdServer) run() {
|
||||
|
||||
if appliedi-snapi > s.snapCount {
|
||||
log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi)
|
||||
s.snapshot(appliedi, nodes)
|
||||
s.snapshot(appliedi, &confState)
|
||||
snapi = appliedi
|
||||
}
|
||||
case <-syncC:
|
||||
@ -701,7 +700,7 @@ func getExpirationTime(r *pb.Request) time.Time {
|
||||
// apply takes entries received from Raft (after it has been committed) and
|
||||
// applies them to the current state of the EtcdServer.
|
||||
// The given entries should not be empty.
|
||||
func (s *EtcdServer) apply(es []raftpb.Entry) (uint64, bool) {
|
||||
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) {
|
||||
var applied uint64
|
||||
for i := range es {
|
||||
e := es[i]
|
||||
@ -713,7 +712,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry) (uint64, bool) {
|
||||
case raftpb.EntryConfChange:
|
||||
var cc raftpb.ConfChange
|
||||
pbutil.MustUnmarshal(&cc, e.Data)
|
||||
shouldstop, err := s.applyConfChange(cc)
|
||||
shouldstop, err := s.applyConfChange(cc, confState)
|
||||
s.w.Trigger(cc.ID, err)
|
||||
if shouldstop {
|
||||
return applied, true
|
||||
@ -779,13 +778,13 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
||||
|
||||
// applyConfChange applies a ConfChange to the server. It is only
|
||||
// invoked with a ConfChange that has already passed through Raft
|
||||
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
|
||||
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
|
||||
if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
|
||||
cc.NodeID = raft.None
|
||||
s.node.ApplyConfChange(cc)
|
||||
return false, err
|
||||
}
|
||||
s.node.ApplyConfChange(cc)
|
||||
*confState = *s.node.ApplyConfChange(cc)
|
||||
switch cc.Type {
|
||||
case raftpb.ConfChangeAddNode:
|
||||
m := new(Member)
|
||||
@ -833,14 +832,14 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
|
||||
}
|
||||
|
||||
// TODO: non-blocking snapshot
|
||||
func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
|
||||
func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) {
|
||||
d, err := s.store.Save()
|
||||
// TODO: current store will never fail to do a snapshot
|
||||
// what should we do if the store might fail?
|
||||
if err != nil {
|
||||
log.Panicf("etcdserver: store save should never fail: %v", err)
|
||||
}
|
||||
err = s.raftStorage.Compact(snapi, &raftpb.ConfState{Nodes: snapnodes}, d)
|
||||
err = s.raftStorage.Compact(snapi, confState, d)
|
||||
if err != nil {
|
||||
// the snapshot was done asynchronously with the progress of raft.
|
||||
// raft might have already got a newer snapshot and called compact.
|
||||
|
@ -475,7 +475,7 @@ func TestApplyConfChangeError(t *testing.T) {
|
||||
node: n,
|
||||
Cluster: cl,
|
||||
}
|
||||
_, err := srv.applyConfChange(tt.cc)
|
||||
_, err := srv.applyConfChange(tt.cc, nil)
|
||||
if err != tt.werr {
|
||||
t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
|
||||
}
|
||||
@ -509,7 +509,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
|
||||
NodeID: 2,
|
||||
}
|
||||
// remove non-local member
|
||||
shouldStop, err := srv.applyConfChange(cc)
|
||||
shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
@ -519,7 +519,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
|
||||
|
||||
// remove local member
|
||||
cc.NodeID = 1
|
||||
shouldStop, err = srv.applyConfChange(cc)
|
||||
shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
@ -917,7 +917,7 @@ func TestSnapshot(t *testing.T) {
|
||||
raftStorage: s,
|
||||
}
|
||||
|
||||
srv.snapshot(1, []uint64{1})
|
||||
srv.snapshot(1, &raftpb.ConfState{Nodes: []uint64{1}})
|
||||
gaction := st.Action()
|
||||
if len(gaction) != 1 {
|
||||
t.Fatalf("len(action) = %d, want 1", len(gaction))
|
||||
@ -1091,10 +1091,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
|
||||
func TestAddMember(t *testing.T) {
|
||||
n := newNodeConfChangeCommitterRecorder()
|
||||
n.readyc <- raft.Ready{
|
||||
SoftState: &raft.SoftState{
|
||||
RaftState: raft.StateLeader,
|
||||
Nodes: []uint64{2345, 3456},
|
||||
},
|
||||
SoftState: &raft.SoftState{RaftState: raft.StateLeader},
|
||||
}
|
||||
cl := newTestCluster(nil)
|
||||
cl.SetStore(store.New())
|
||||
@ -1128,10 +1125,7 @@ func TestAddMember(t *testing.T) {
|
||||
func TestRemoveMember(t *testing.T) {
|
||||
n := newNodeConfChangeCommitterRecorder()
|
||||
n.readyc <- raft.Ready{
|
||||
SoftState: &raft.SoftState{
|
||||
RaftState: raft.StateLeader,
|
||||
Nodes: []uint64{1234, 2345, 3456},
|
||||
},
|
||||
SoftState: &raft.SoftState{RaftState: raft.StateLeader},
|
||||
}
|
||||
cl := newTestCluster(nil)
|
||||
cl.SetStore(store.New())
|
||||
@ -1165,10 +1159,7 @@ func TestRemoveMember(t *testing.T) {
|
||||
func TestUpdateMember(t *testing.T) {
|
||||
n := newNodeConfChangeCommitterRecorder()
|
||||
n.readyc <- raft.Ready{
|
||||
SoftState: &raft.SoftState{
|
||||
RaftState: raft.StateLeader,
|
||||
Nodes: []uint64{1234, 2345, 3456},
|
||||
},
|
||||
SoftState: &raft.SoftState{RaftState: raft.StateLeader},
|
||||
}
|
||||
cl := newTestCluster(nil)
|
||||
cl.SetStore(store.New())
|
||||
@ -1575,7 +1566,7 @@ func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
|
||||
func (n *nodeRecorder) Advance() {}
|
||||
func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
|
||||
n.record(action{name: "ApplyConfChange", params: []interface{}{conf}})
|
||||
return nil
|
||||
return &raftpb.ConfState{}
|
||||
}
|
||||
func (n *nodeRecorder) Stop() {
|
||||
n.record(action{name: "Stop"})
|
||||
@ -1639,7 +1630,7 @@ func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
|
||||
}
|
||||
func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
|
||||
n.record(action{name: "ApplyConfChange:" + conf.Type.String()})
|
||||
return nil
|
||||
return &raftpb.ConfState{}
|
||||
}
|
||||
|
||||
type waitWithResponse struct {
|
||||
|
@ -19,7 +19,6 @@ package raft
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"reflect"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
pb "github.com/coreos/etcd/raft/raftpb"
|
||||
@ -37,11 +36,10 @@ var (
|
||||
type SoftState struct {
|
||||
Lead uint64
|
||||
RaftState StateType
|
||||
Nodes []uint64
|
||||
}
|
||||
|
||||
func (a *SoftState) equal(b *SoftState) bool {
|
||||
return reflect.DeepEqual(a, b)
|
||||
return a.Lead == b.Lead && a.RaftState == b.RaftState
|
||||
}
|
||||
|
||||
// Ready encapsulates the entries and messages that are ready to read,
|
||||
|
@ -305,7 +305,7 @@ func TestNodeStart(t *testing.T) {
|
||||
}
|
||||
wants := []Ready{
|
||||
{
|
||||
SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RaftState: StateLeader},
|
||||
SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
|
||||
HardState: raftpb.HardState{Term: 2, Commit: 2},
|
||||
Entries: []raftpb.Entry{
|
||||
{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
|
||||
@ -446,7 +446,6 @@ func TestSoftStateEqual(t *testing.T) {
|
||||
{&SoftState{}, true},
|
||||
{&SoftState{Lead: 1}, false},
|
||||
{&SoftState{RaftState: StateLeader}, false},
|
||||
{&SoftState{Nodes: []uint64{1, 2}}, false},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
if g := tt.st.equal(&SoftState{}); g != tt.we {
|
||||
|
@ -175,9 +175,7 @@ func (r *raft) hasLeader() bool { return r.lead != None }
|
||||
|
||||
func (r *raft) leader() uint64 { return r.lead }
|
||||
|
||||
func (r *raft) softState() *SoftState {
|
||||
return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()}
|
||||
}
|
||||
func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
|
||||
|
||||
func (r *raft) q() int { return len(r.prs)/2 + 1 }
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user