diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index a176c4d8f..15d57de9a 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -27,6 +27,7 @@ import ( "reflect" "sort" "strings" + "sync" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/pkg/flags" @@ -58,6 +59,7 @@ type Cluster struct { // removed id cannot be reused. removed map[types.ID]bool store store.Store + sync.Mutex } // NewClusterFromString returns Cluster through given cluster token and parsing @@ -112,9 +114,11 @@ func newCluster(token string) *Cluster { } } -func (c Cluster) ID() types.ID { return c.id } +func (c *Cluster) ID() types.ID { return c.id } -func (c Cluster) Members() []*Member { +func (c *Cluster) Members() []*Member { + c.Lock() + defer c.Unlock() var sms SortableMemberSlice for _, m := range c.members { sms = append(sms, m) @@ -130,12 +134,16 @@ func (s SortableMemberSlice) Less(i, j int) bool { return s[i].ID < s[j].ID } func (s SortableMemberSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (c *Cluster) Member(id types.ID) *Member { + c.Lock() + defer c.Unlock() return c.members[id] } // MemberByName returns a Member with the given name if exists. // If more than one member has the given name, it will panic. func (c *Cluster) MemberByName(name string) *Member { + c.Lock() + defer c.Unlock() var memb *Member for _, m := range c.members { if m.Name == name { @@ -148,7 +156,9 @@ func (c *Cluster) MemberByName(name string) *Member { return memb } -func (c Cluster) MemberIDs() []types.ID { +func (c *Cluster) MemberIDs() []types.ID { + c.Lock() + defer c.Unlock() var ids []types.ID for _, m := range c.members { ids = append(ids, m.ID) @@ -158,13 +168,17 @@ func (c Cluster) MemberIDs() []types.ID { } func (c *Cluster) IsIDRemoved(id types.ID) bool { + c.Lock() + defer c.Unlock() return c.removed[id] } // PeerURLs returns a list of all peer addresses. Each address is prefixed // with the scheme (currently "http://"). The returned list is sorted in // ascending lexicographical order. -func (c Cluster) PeerURLs() []string { +func (c *Cluster) PeerURLs() []string { + c.Lock() + defer c.Unlock() endpoints := make([]string, 0) for _, p := range c.members { for _, addr := range p.PeerURLs { @@ -178,7 +192,9 @@ func (c Cluster) PeerURLs() []string { // ClientURLs returns a list of all client addresses. Each address is prefixed // with the scheme (currently "http://"). The returned list is sorted in // ascending lexicographical order. -func (c Cluster) ClientURLs() []string { +func (c *Cluster) ClientURLs() []string { + c.Lock() + defer c.Unlock() urls := make([]string, 0) for _, p := range c.members { for _, url := range p.ClientURLs { @@ -189,7 +205,9 @@ func (c Cluster) ClientURLs() []string { return urls } -func (c Cluster) String() string { +func (c *Cluster) String() string { + c.Lock() + defer c.Unlock() sl := []string{} for _, m := range c.members { for _, u := range m.PeerURLs { @@ -279,6 +297,8 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { // AddMember puts a new Member into the store. // A Member with a matching id must not exist. func (c *Cluster) AddMember(m *Member) { + c.Lock() + defer c.Unlock() b, err := json.Marshal(m.RaftAttributes) if err != nil { log.Panicf("marshal raftAttributes should never fail: %v", err) @@ -301,6 +321,8 @@ func (c *Cluster) AddMember(m *Member) { // RemoveMember removes a member from the store. // The given id MUST exist, or the function panics. func (c *Cluster) RemoveMember(id types.ID) { + c.Lock() + defer c.Unlock() if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil { log.Panicf("delete member should never fail: %v", err) } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index b5a9486fc..b52e8cf8c 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1382,31 +1382,6 @@ func (w *waitWithResponse) Register(id uint64) <-chan interface{} { } func (w *waitWithResponse) Trigger(id uint64, x interface{}) {} -type clusterStoreRecorder struct { - recorder -} - -func (cs *clusterStoreRecorder) Add(m Member) { - cs.record(action{name: "Add", params: []interface{}{m}}) -} -func (cs *clusterStoreRecorder) Get() Cluster { - cs.record(action{name: "Get"}) - return Cluster{} -} -func (cs *clusterStoreRecorder) Remove(id uint64) { - cs.record(action{name: "Remove", params: []interface{}{id}}) -} -func (cs *clusterStoreRecorder) IsRemoved(id uint64) bool { return false } - -type removedClusterStore struct { - removed map[uint64]bool -} - -func (cs *removedClusterStore) Add(m Member) {} -func (cs *removedClusterStore) Get() Cluster { return Cluster{} } -func (cs *removedClusterStore) Remove(id uint64) {} -func (cs *removedClusterStore) IsRemoved(id uint64) bool { return cs.removed[id] } - type nopSender struct{} func (s *nopSender) Send(m []raftpb.Message) {}