etcdserver, api, membership: don't race on setting version

Fixes #6029
This commit is contained in:
Anthony Romano 2016-07-26 11:33:11 -07:00 committed by Gyu-Ho Lee
parent 00bdd907d5
commit e5a5e5f7c6
6 changed files with 25 additions and 39 deletions

View File

@ -16,9 +16,7 @@ package api
import ( import (
"sync" "sync"
"time"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/version" "github.com/coreos/etcd/version"
"github.com/coreos/go-semver/semver" "github.com/coreos/go-semver/semver"
"github.com/coreos/pkg/capnslog" "github.com/coreos/pkg/capnslog"
@ -43,45 +41,32 @@ var (
"3.0.0": {AuthCapability: true, V3rpcCapability: true}, "3.0.0": {AuthCapability: true, V3rpcCapability: true},
} }
// capLoopOnce ensures we only create one capability monitor goroutine
capLoopOnce sync.Once
enableMapMu sync.RWMutex enableMapMu sync.RWMutex
// enabledMap points to a map in capabilityMaps // enabledMap points to a map in capabilityMaps
enabledMap map[Capability]bool enabledMap map[Capability]bool
curVersion *semver.Version
) )
func init() { func init() {
enabledMap = make(map[Capability]bool) enabledMap = make(map[Capability]bool)
} }
// RunCapabilityLoop checks the cluster version every 500ms and updates // UpdateCapability updates the enabledMap when the cluster version increases.
// the enabledMap when the cluster version increased. func UpdateCapability(v *semver.Version) {
func RunCapabilityLoop(s *etcdserver.EtcdServer) { if v == nil {
go capLoopOnce.Do(func() { runCapabilityLoop(s) }) // if recovered but version was never set by cluster
}
func runCapabilityLoop(s *etcdserver.EtcdServer) {
stopped := s.StopNotify()
var pv *semver.Version
for {
if v := s.ClusterVersion(); v != pv {
if pv == nil || (v != nil && pv.LessThan(*v)) {
pv = v
enableMapMu.Lock()
enabledMap = capabilityMaps[pv.String()]
enableMapMu.Unlock()
plog.Infof("enabled capabilities for version %s", version.Cluster(pv.String()))
}
}
select {
case <-stopped:
return return
case <-time.After(500 * time.Millisecond):
} }
enableMapMu.Lock()
if curVersion != nil && !curVersion.LessThan(*v) {
enableMapMu.Unlock()
return
} }
curVersion = v
enabledMap = capabilityMaps[curVersion.String()]
enableMapMu.Unlock()
plog.Infof("enabled capabilities for version %s", version.Cluster(v.String()))
} }
func IsCapabilityEnabled(c Capability) bool { func IsCapabilityEnabled(c Capability) bool {

View File

@ -130,7 +130,6 @@ func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http
mux.Handle(pprofPrefix+"/block", pprof.Handler("block")) mux.Handle(pprofPrefix+"/block", pprof.Handler("block"))
} }
api.RunCapabilityLoop(server)
return requestLogger(mux) return requestLogger(mux)
} }

View File

@ -18,7 +18,6 @@ import (
"crypto/tls" "crypto/tls"
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/pkg/capnslog" "github.com/coreos/pkg/capnslog"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -47,6 +46,5 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
pb.RegisterAuthServer(grpcServer, NewAuthServer(s)) pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s)) pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
api.RunCapabilityLoop(s)
return grpcServer return grpcServer
} }

View File

@ -19,6 +19,7 @@ import (
"path" "path"
"time" "time"
"github.com/coreos/etcd/etcdserver/api"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/pbutil"
@ -86,7 +87,7 @@ func (a *applierV2store) Put(r *pb.Request) Response {
} }
if r.Path == membership.StoreClusterVersionKey() { if r.Path == membership.StoreClusterVersionKey() {
if a.cluster != nil { if a.cluster != nil {
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val))) a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability)
} }
// return an empty response since there is no consumer. // return an empty response since there is no consumer.
return Response{} return Response{}

View File

@ -200,13 +200,14 @@ func (c *RaftCluster) SetBackend(be backend.Backend) {
mustCreateBackendBuckets(c.be) mustCreateBackendBuckets(c.be)
} }
func (c *RaftCluster) Recover() { func (c *RaftCluster) Recover(onSet func(*semver.Version)) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
c.members, c.removed = membersFromStore(c.store) c.members, c.removed = membersFromStore(c.store)
c.version = clusterVersionFromStore(c.store) c.version = clusterVersionFromStore(c.store)
mustDetectDowngrade(c.version) mustDetectDowngrade(c.version)
onSet(c.version)
for _, m := range c.members { for _, m := range c.members {
plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.id) plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.id)
@ -356,7 +357,7 @@ func (c *RaftCluster) Version() *semver.Version {
return semver.Must(semver.NewVersion(c.version.String())) return semver.Must(semver.NewVersion(c.version.String()))
} }
func (c *RaftCluster) SetVersion(ver *semver.Version) { func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*semver.Version)) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
if c.version != nil { if c.version != nil {
@ -372,6 +373,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version) {
if c.be != nil { if c.be != nil {
mustSaveClusterVersionToBackend(c.be, ver) mustSaveClusterVersionToBackend(c.be, ver)
} }
onSet(ver)
} }
func (c *RaftCluster) IsReadyToAddNewMember() bool { func (c *RaftCluster) IsReadyToAddNewMember() bool {

View File

@ -31,6 +31,7 @@ import (
"github.com/coreos/etcd/auth" "github.com/coreos/etcd/auth"
"github.com/coreos/etcd/compactor" "github.com/coreos/etcd/compactor"
"github.com/coreos/etcd/discovery" "github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/etcdserver/api"
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes" "github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/etcdserver/membership"
@ -356,7 +357,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
} }
cl.SetStore(st) cl.SetStore(st)
cl.SetBackend(be) cl.SetBackend(be)
cl.Recover() cl.Recover(api.UpdateCapability)
if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
os.RemoveAll(bepath) os.RemoveAll(bepath)
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
@ -709,7 +710,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
s.cluster.SetBackend(s.be) s.cluster.SetBackend(s.be)
plog.Info("recovering cluster configuration...") plog.Info("recovering cluster configuration...")
s.cluster.Recover() s.cluster.Recover(api.UpdateCapability)
plog.Info("finished recovering cluster configuration") plog.Info("finished recovering cluster configuration")
plog.Info("removing old peers from network...") plog.Info("removing old peers from network...")