etcdserver: add docstrings for confchanges

This commit is contained in:
Jonathan Boulle 2014-11-06 12:09:21 -08:00
parent bf47fe7cac
commit 5055863e09
2 changed files with 14 additions and 8 deletions

View File

@ -259,18 +259,20 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
func (c *Cluster) SetStore(st store.Store) { c.store = st } func (c *Cluster) SetStore(st store.Store) { c.store = st }
// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is still valid.
func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
appliedMembers, appliedRemoved := membersFromStore(c.store) members, removed := membersFromStore(c.store)
if appliedRemoved[types.ID(cc.NodeID)] { if removed[types.ID(cc.NodeID)] {
return ErrIDRemoved return ErrIDRemoved
} }
switch cc.Type { switch cc.Type {
case raftpb.ConfChangeAddNode: case raftpb.ConfChangeAddNode:
if appliedMembers[types.ID(cc.NodeID)] != nil { if members[types.ID(cc.NodeID)] != nil {
return ErrIDExists return ErrIDExists
} }
urls := make(map[string]bool) urls := make(map[string]bool)
for _, m := range appliedMembers { for _, m := range members {
for _, u := range m.PeerURLs { for _, u := range m.PeerURLs {
urls[u] = true urls[u] = true
} }
@ -285,7 +287,7 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
} }
} }
case raftpb.ConfChangeRemoveNode: case raftpb.ConfChangeRemoveNode:
if appliedMembers[types.ID(cc.NodeID)] == nil { if members[types.ID(cc.NodeID)] == nil {
return ErrIDNotFound return ErrIDNotFound
} }
default: default:

View File

@ -333,7 +333,6 @@ func (s *EtcdServer) run() {
// TODO(bmizerany): do this in the background, but take // TODO(bmizerany): do this in the background, but take
// care to apply entries in a single goroutine, and not // care to apply entries in a single goroutine, and not
// race them. // race them.
// TODO: apply configuration change into ClusterStore.
if len(rd.CommittedEntries) != 0 { if len(rd.CommittedEntries) != 0 {
appliedi = s.apply(rd.CommittedEntries) appliedi = s.apply(rd.CommittedEntries)
} }
@ -480,8 +479,9 @@ func (s *EtcdServer) Term() uint64 {
return atomic.LoadUint64(&s.raftTerm) return atomic.LoadUint64(&s.raftTerm)
} }
// configure sends configuration change through consensus then performs it. // configure sends a configuration change through consensus and
// It will block until the change is performed or there is an error. // then waits for it to be applied to the server. It
// will block until the change is performed or there is an error.
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error { func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
ch := s.w.Register(cc.ID) ch := s.w.Register(cc.ID)
if err := s.node.ProposeConfChange(ctx, cc); err != nil { if err := s.node.ProposeConfChange(ctx, cc); err != nil {
@ -567,6 +567,8 @@ func getExpirationTime(r *pb.Request) time.Time {
return t return t
} }
// apply takes an Entry received from Raft (after it has been committed) and
// applies it to the current state of the EtcdServer
func (s *EtcdServer) apply(es []raftpb.Entry) uint64 { func (s *EtcdServer) apply(es []raftpb.Entry) uint64 {
var applied uint64 var applied uint64
for i := range es { for i := range es {
@ -641,6 +643,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
} }
} }
// applyConfChange applies a ConfChange to the server. It is only
// invoked with a ConfChange that has already passed through Raft
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error {
if err := s.Cluster.ValidateConfigurationChange(cc); err != nil { if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
cc.NodeID = raft.None cc.NodeID = raft.None