mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: move config validation to cluster
This commit is contained in:
parent
98406af448
commit
99b1af40c6
@ -31,6 +31,7 @@ import (
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
"github.com/coreos/etcd/pkg/flags"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/store"
|
||||
)
|
||||
|
||||
@ -239,6 +240,27 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
|
||||
|
||||
func (c *Cluster) SetStore(st store.Store) { c.store = st }
|
||||
|
||||
func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
|
||||
appliedMembers, appliedRemoved := membersFromStore(c.store)
|
||||
|
||||
if appliedRemoved[types.ID(cc.NodeID)] {
|
||||
return ErrIDRemoved
|
||||
}
|
||||
switch cc.Type {
|
||||
case raftpb.ConfChangeAddNode:
|
||||
if appliedMembers[types.ID(cc.NodeID)] != nil {
|
||||
return ErrIDExists
|
||||
}
|
||||
case raftpb.ConfChangeRemoveNode:
|
||||
if appliedMembers[types.ID(cc.NodeID)] == nil {
|
||||
return ErrIDNotFound
|
||||
}
|
||||
default:
|
||||
log.Panicf("ConfChange type should be either AddNode or RemoveNode")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddMember puts a new Member into the store.
|
||||
// A Member with a matching id must not exist.
|
||||
func (c *Cluster) AddMember(m *Member) {
|
||||
|
@ -93,13 +93,11 @@ func TestClusterFromStore(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
st := store.New()
|
||||
hc := newTestCluster(nil)
|
||||
hc.SetStore(st)
|
||||
for _, m := range tt.mems {
|
||||
hc.AddMember(&m)
|
||||
}
|
||||
c := NewClusterFromStore("abc", st)
|
||||
c := NewClusterFromStore("abc", hc.store)
|
||||
if c.token != "abc" {
|
||||
t.Errorf("#%d: token = %v, want %v", i, c.token, "abc")
|
||||
}
|
||||
@ -535,8 +533,9 @@ func TestNodeToMember(t *testing.T) {
|
||||
|
||||
func newTestCluster(membs []Member) *Cluster {
|
||||
c := &Cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)}
|
||||
for i, m := range membs {
|
||||
c.members[m.ID] = &membs[i]
|
||||
c.store = store.New()
|
||||
for i := range membs {
|
||||
c.AddMember(&membs[i])
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
@ -45,7 +45,6 @@ type Member struct {
|
||||
ID types.ID `json:"id"`
|
||||
RaftAttributes
|
||||
Attributes
|
||||
verified bool
|
||||
}
|
||||
|
||||
// newMember creates a Member without an ID and generates one based on the
|
||||
|
@ -328,7 +328,7 @@ func (s *EtcdServer) run() {
|
||||
// race them.
|
||||
// TODO: apply configuration change into ClusterStore.
|
||||
if len(rd.CommittedEntries) != 0 {
|
||||
appliedi = s.apply(rd.CommittedEntries, nodes)
|
||||
appliedi = s.apply(rd.CommittedEntries)
|
||||
}
|
||||
|
||||
if rd.Snapshot.Index > snapi {
|
||||
@ -559,7 +559,7 @@ func getExpirationTime(r *pb.Request) time.Time {
|
||||
return t
|
||||
}
|
||||
|
||||
func (s *EtcdServer) apply(es []raftpb.Entry, nodes []uint64) uint64 {
|
||||
func (s *EtcdServer) apply(es []raftpb.Entry) uint64 {
|
||||
var applied uint64
|
||||
for i := range es {
|
||||
e := es[i]
|
||||
@ -571,7 +571,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, nodes []uint64) uint64 {
|
||||
case raftpb.EntryConfChange:
|
||||
var cc raftpb.ConfChange
|
||||
pbutil.MustUnmarshal(&cc, e.Data)
|
||||
s.w.Trigger(cc.ID, s.applyConfChange(cc, nodes))
|
||||
s.w.Trigger(cc.ID, s.applyConfChange(cc))
|
||||
default:
|
||||
log.Panicf("entry type should be either EntryNormal or EntryConfChange")
|
||||
}
|
||||
@ -633,8 +633,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error {
|
||||
if err := s.checkConfChange(cc, nodes); err != nil {
|
||||
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error {
|
||||
if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
|
||||
cc.NodeID = raft.None
|
||||
s.node.ApplyConfChange(cc)
|
||||
return err
|
||||
@ -659,25 +659,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) checkConfChange(cc raftpb.ConfChange, nodes []uint64) error {
|
||||
if s.Cluster.IsIDRemoved(types.ID(cc.NodeID)) {
|
||||
return ErrIDRemoved
|
||||
}
|
||||
switch cc.Type {
|
||||
case raftpb.ConfChangeAddNode:
|
||||
if containsUint64(nodes, cc.NodeID) {
|
||||
return ErrIDExists
|
||||
}
|
||||
case raftpb.ConfChangeRemoveNode:
|
||||
if !containsUint64(nodes, cc.NodeID) {
|
||||
return ErrIDNotFound
|
||||
}
|
||||
default:
|
||||
log.Panicf("ConfChange type should be either AddNode or RemoveNode")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: non-blocking snapshot
|
||||
func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
|
||||
d, err := s.store.Save()
|
||||
|
@ -421,8 +421,13 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
|
||||
|
||||
// TODO: test ErrIDRemoved
|
||||
func TestApplyConfChangeError(t *testing.T) {
|
||||
nodes := []uint64{1, 2, 3}
|
||||
removed := map[types.ID]bool{4: true}
|
||||
cl := newCluster("")
|
||||
cl.SetStore(store.New())
|
||||
for i := 1; i <= 4; i++ {
|
||||
cl.AddMember(&Member{ID: types.ID(i)})
|
||||
}
|
||||
cl.RemoveMember(4)
|
||||
|
||||
tests := []struct {
|
||||
cc raftpb.ConfChange
|
||||
werr error
|
||||
@ -458,12 +463,11 @@ func TestApplyConfChangeError(t *testing.T) {
|
||||
}
|
||||
for i, tt := range tests {
|
||||
n := &nodeRecorder{}
|
||||
cl := &Cluster{removed: removed}
|
||||
srv := &EtcdServer{
|
||||
node: n,
|
||||
Cluster: cl,
|
||||
}
|
||||
err := srv.applyConfChange(tt.cc, nodes)
|
||||
err := srv.applyConfChange(tt.cc)
|
||||
if err != tt.werr {
|
||||
t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
|
||||
}
|
||||
@ -506,11 +510,12 @@ func testServer(t *testing.T, ns uint64) {
|
||||
n := raft.StartNode(id, members, 10, 1)
|
||||
tk := time.NewTicker(10 * time.Millisecond)
|
||||
defer tk.Stop()
|
||||
st := store.New()
|
||||
cl := newCluster("abc")
|
||||
cl.SetStore(&storeRecorder{})
|
||||
cl.SetStore(st)
|
||||
srv := &EtcdServer{
|
||||
node: n,
|
||||
store: store.New(),
|
||||
store: st,
|
||||
send: send,
|
||||
storage: &storageRecorder{},
|
||||
Ticker: tk.C,
|
||||
@ -536,8 +541,8 @@ func testServer(t *testing.T, ns uint64) {
|
||||
|
||||
g, w := resp.Event.Node, &store.NodeExtern{
|
||||
Key: "/foo",
|
||||
ModifiedIndex: uint64(i),
|
||||
CreatedIndex: uint64(i),
|
||||
ModifiedIndex: uint64(i) + 2*ns,
|
||||
CreatedIndex: uint64(i) + 2*ns,
|
||||
Value: stringp("bar"),
|
||||
}
|
||||
|
||||
@ -576,7 +581,7 @@ func TestDoProposal(t *testing.T) {
|
||||
// this makes <-tk always successful, which accelerates internal clock
|
||||
close(tk)
|
||||
cl := newCluster("abc")
|
||||
cl.SetStore(&storeRecorder{})
|
||||
cl.SetStore(store.New())
|
||||
srv := &EtcdServer{
|
||||
node: n,
|
||||
store: st,
|
||||
@ -833,13 +838,15 @@ func TestTriggerSnap(t *testing.T) {
|
||||
n.Campaign(ctx)
|
||||
st := &storeRecorder{}
|
||||
p := &storageRecorder{}
|
||||
cl := newCluster("abc")
|
||||
cl.SetStore(store.New())
|
||||
s := &EtcdServer{
|
||||
store: st,
|
||||
send: func(_ []raftpb.Message) {},
|
||||
storage: p,
|
||||
node: n,
|
||||
snapCount: 10,
|
||||
Cluster: &Cluster{},
|
||||
Cluster: cl,
|
||||
}
|
||||
|
||||
s.start()
|
||||
@ -928,7 +935,7 @@ func TestAddMember(t *testing.T) {
|
||||
},
|
||||
}
|
||||
cl := newTestCluster(nil)
|
||||
cl.SetStore(&storeRecorder{})
|
||||
cl.SetStore(store.New())
|
||||
s := &EtcdServer{
|
||||
node: n,
|
||||
store: &storeRecorder{},
|
||||
@ -964,7 +971,6 @@ func TestRemoveMember(t *testing.T) {
|
||||
},
|
||||
}
|
||||
cl := newTestCluster([]Member{{ID: 1234}})
|
||||
cl.SetStore(&storeRecorder{})
|
||||
s := &EtcdServer{
|
||||
node: n,
|
||||
store: &storeRecorder{},
|
||||
|
Loading…
x
Reference in New Issue
Block a user