diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 15d57de9a..75197063b 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -259,18 +259,20 @@ func (c *Cluster) SetID(id types.ID) { c.id = id } func (c *Cluster) SetStore(st store.Store) { c.store = st } +// ValidateConfigurationChange takes a proposed ConfChange and +// ensures that it is still valid. func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { - appliedMembers, appliedRemoved := membersFromStore(c.store) - if appliedRemoved[types.ID(cc.NodeID)] { + members, removed := membersFromStore(c.store) + if removed[types.ID(cc.NodeID)] { return ErrIDRemoved } switch cc.Type { case raftpb.ConfChangeAddNode: - if appliedMembers[types.ID(cc.NodeID)] != nil { + if members[types.ID(cc.NodeID)] != nil { return ErrIDExists } urls := make(map[string]bool) - for _, m := range appliedMembers { + for _, m := range members { for _, u := range m.PeerURLs { urls[u] = true } @@ -285,7 +287,7 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { } } case raftpb.ConfChangeRemoveNode: - if appliedMembers[types.ID(cc.NodeID)] == nil { + if members[types.ID(cc.NodeID)] == nil { return ErrIDNotFound } default: diff --git a/etcdserver/server.go b/etcdserver/server.go index 34c648e76..2fbc28c52 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -333,7 +333,6 @@ func (s *EtcdServer) run() { // TODO(bmizerany): do this in the background, but take // care to apply entries in a single goroutine, and not // race them. - // TODO: apply configuration change into ClusterStore. if len(rd.CommittedEntries) != 0 { appliedi = s.apply(rd.CommittedEntries) } @@ -480,8 +479,9 @@ func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.raftTerm) } -// configure sends configuration change through consensus then performs it. -// It will block until the change is performed or there is an error. +// configure sends a configuration change through consensus and +// then waits for it to be applied to the server. It +// will block until the change is performed or there is an error. func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error { ch := s.w.Register(cc.ID) if err := s.node.ProposeConfChange(ctx, cc); err != nil { @@ -567,6 +567,8 @@ func getExpirationTime(r *pb.Request) time.Time { return t } +// apply takes an Entry received from Raft (after it has been committed) and +// applies it to the current state of the EtcdServer func (s *EtcdServer) apply(es []raftpb.Entry) uint64 { var applied uint64 for i := range es { @@ -641,6 +643,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { } } +// applyConfChange applies a ConfChange to the server. It is only +// invoked with a ConfChange that has already passed through Raft func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { if err := s.Cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None