mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #6045 from heyitsanthony/fix-version-race
etcdserver, api, membership: don't race on setting version
This commit is contained in:
commit
13c2d32061
@ -16,9 +16,7 @@ package api
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/version"
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
@ -43,45 +41,32 @@ var (
|
||||
"3.0.0": {AuthCapability: true, V3rpcCapability: true},
|
||||
}
|
||||
|
||||
// capLoopOnce ensures we only create one capability monitor goroutine
|
||||
capLoopOnce sync.Once
|
||||
|
||||
enableMapMu sync.RWMutex
|
||||
// enabledMap points to a map in capabilityMaps
|
||||
enabledMap map[Capability]bool
|
||||
|
||||
curVersion *semver.Version
|
||||
)
|
||||
|
||||
func init() {
|
||||
enabledMap = make(map[Capability]bool)
|
||||
}
|
||||
|
||||
// RunCapabilityLoop checks the cluster version every 500ms and updates
|
||||
// the enabledMap when the cluster version increased.
|
||||
func RunCapabilityLoop(s *etcdserver.EtcdServer) {
|
||||
go capLoopOnce.Do(func() { runCapabilityLoop(s) })
|
||||
}
|
||||
|
||||
func runCapabilityLoop(s *etcdserver.EtcdServer) {
|
||||
stopped := s.StopNotify()
|
||||
|
||||
var pv *semver.Version
|
||||
for {
|
||||
if v := s.ClusterVersion(); v != pv {
|
||||
if pv == nil || (v != nil && pv.LessThan(*v)) {
|
||||
pv = v
|
||||
enableMapMu.Lock()
|
||||
enabledMap = capabilityMaps[pv.String()]
|
||||
enableMapMu.Unlock()
|
||||
plog.Infof("enabled capabilities for version %s", version.Cluster(pv.String()))
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-stopped:
|
||||
return
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
// UpdateCapability updates the enabledMap when the cluster version increases.
|
||||
func UpdateCapability(v *semver.Version) {
|
||||
if v == nil {
|
||||
// if recovered but version was never set by cluster
|
||||
return
|
||||
}
|
||||
enableMapMu.Lock()
|
||||
if curVersion != nil && !curVersion.LessThan(*v) {
|
||||
enableMapMu.Unlock()
|
||||
return
|
||||
}
|
||||
curVersion = v
|
||||
enabledMap = capabilityMaps[curVersion.String()]
|
||||
enableMapMu.Unlock()
|
||||
plog.Infof("enabled capabilities for version %s", version.Cluster(v.String()))
|
||||
}
|
||||
|
||||
func IsCapabilityEnabled(c Capability) bool {
|
||||
|
@ -130,7 +130,6 @@ func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http
|
||||
mux.Handle(pprofPrefix+"/block", pprof.Handler("block"))
|
||||
}
|
||||
|
||||
api.RunCapabilityLoop(server)
|
||||
return requestLogger(mux)
|
||||
}
|
||||
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
"crypto/tls"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"google.golang.org/grpc"
|
||||
@ -47,6 +46,5 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
|
||||
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
|
||||
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
|
||||
|
||||
api.RunCapabilityLoop(s)
|
||||
return grpcServer
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
@ -86,7 +87,7 @@ func (a *applierV2store) Put(r *pb.Request) Response {
|
||||
}
|
||||
if r.Path == membership.StoreClusterVersionKey() {
|
||||
if a.cluster != nil {
|
||||
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
|
||||
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability)
|
||||
}
|
||||
// return an empty response since there is no consumer.
|
||||
return Response{}
|
||||
|
@ -200,13 +200,14 @@ func (c *RaftCluster) SetBackend(be backend.Backend) {
|
||||
mustCreateBackendBuckets(c.be)
|
||||
}
|
||||
|
||||
func (c *RaftCluster) Recover() {
|
||||
func (c *RaftCluster) Recover(onSet func(*semver.Version)) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
c.members, c.removed = membersFromStore(c.store)
|
||||
c.version = clusterVersionFromStore(c.store)
|
||||
mustDetectDowngrade(c.version)
|
||||
onSet(c.version)
|
||||
|
||||
for _, m := range c.members {
|
||||
plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.id)
|
||||
@ -356,7 +357,7 @@ func (c *RaftCluster) Version() *semver.Version {
|
||||
return semver.Must(semver.NewVersion(c.version.String()))
|
||||
}
|
||||
|
||||
func (c *RaftCluster) SetVersion(ver *semver.Version) {
|
||||
func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*semver.Version)) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
if c.version != nil {
|
||||
@ -372,6 +373,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version) {
|
||||
if c.be != nil {
|
||||
mustSaveClusterVersionToBackend(c.be, ver)
|
||||
}
|
||||
onSet(ver)
|
||||
}
|
||||
|
||||
func (c *RaftCluster) IsReadyToAddNewMember() bool {
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
"github.com/coreos/etcd/auth"
|
||||
"github.com/coreos/etcd/compactor"
|
||||
"github.com/coreos/etcd/discovery"
|
||||
"github.com/coreos/etcd/etcdserver/api"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
@ -342,7 +343,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
cl.SetStore(st)
|
||||
cl.SetBackend(be)
|
||||
cl.Recover()
|
||||
cl.Recover(api.UpdateCapability)
|
||||
if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
|
||||
os.RemoveAll(bepath)
|
||||
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
|
||||
@ -705,7 +706,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
|
||||
s.cluster.SetBackend(s.be)
|
||||
plog.Info("recovering cluster configuration...")
|
||||
s.cluster.Recover()
|
||||
s.cluster.Recover(api.UpdateCapability)
|
||||
plog.Info("finished recovering cluster configuration")
|
||||
|
||||
plog.Info("removing old peers from network...")
|
||||
|
Loading…
x
Reference in New Issue
Block a user