diff --git a/etcdserver/auth/auth.go b/etcdserver/auth/auth.go index 7d27e74c2..93f8909dd 100644 --- a/etcdserver/auth/auth.go +++ b/etcdserver/auth/auth.go @@ -22,6 +22,7 @@ import ( "reflect" "sort" "strings" + "sync" "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" @@ -93,7 +94,9 @@ type store struct { server doer timeout time.Duration ensuredOnce bool - enabled *bool + + mu sync.Mutex // protect enabled + enabled *bool } type User struct { @@ -377,6 +380,9 @@ func (s *store) UpdateRole(role Role) (Role, error) { } func (s *store) AuthEnabled() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.detectAuth() } @@ -384,6 +390,10 @@ func (s *store) EnableAuth() error { if s.AuthEnabled() { return authErr(http.StatusConflict, "already enabled") } + + s.mu.Lock() + defer s.mu.Unlock() + _, err := s.GetUser("root") if err != nil { return authErr(http.StatusConflict, "No root user available, please create one") @@ -412,6 +422,10 @@ func (s *store) DisableAuth() error { if !s.AuthEnabled() { return authErr(http.StatusConflict, "already disabled") } + + s.mu.Lock() + defer s.mu.Unlock() + err := s.disableAuth() if err == nil { b := false diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 087b8122b..2b8a8f096 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -220,6 +220,9 @@ func (c *cluster) SetID(id types.ID) { c.id = id } func (c *cluster) SetStore(st store.Store) { c.store = st } func (c *cluster) Recover() { + c.Lock() + defer c.Unlock() + c.members, c.removed = membersFromStore(c.store) c.version = clusterVersionFromStore(c.store) MustDetectDowngrade(c.version) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index cfe69c3f6..f663cf888 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -52,6 +52,8 @@ const ( ) var ( + // protects raftStatus + raftStatusMu sync.Mutex // indirection for expvar func interface // expvar panics when publishing duplicate name // expvar does not support remove a registered name @@ -62,7 +64,11 @@ var ( func init() { raft.SetLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "raft")) - expvar.Publish("raft.status", expvar.Func(func() interface{} { return raftStatus() })) + expvar.Publish("raft.status", expvar.Func(func() interface{} { + raftStatusMu.Lock() + defer raftStatusMu.Unlock() + return raftStatus() + })) } type RaftTimer interface { @@ -273,7 +279,9 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r MaxInflightMsgs: maxInflightMsgs, } n = raft.StartNode(c, peers) + raftStatusMu.Lock() raftStatus = n.Status + raftStatusMu.Unlock() advanceTicksForElection(n, c.ElectionTick) return } @@ -303,7 +311,9 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust MaxInflightMsgs: maxInflightMsgs, } n := raft.RestartNode(c) + raftStatusMu.Lock() raftStatus = n.Status + raftStatusMu.Unlock() advanceTicksForElection(n, c.ElectionTick) return id, cl, n, s, w } diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 32ac5f764..39c1622bb 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -151,7 +151,10 @@ func (t *transport) Send(msgs []raftpb.Message) { t.maybeUpdatePeersTerm(m.Term) } + t.mu.RLock() p, ok := t.peers[to] + t.mu.RUnlock() + if ok { if m.Type == raftpb.MsgApp { t.serverStats.SendAppendReq(m.Size())