diff --git a/etcdserver/api/capability.go b/etcdserver/api/capability.go new file mode 100644 index 000000000..40318d542 --- /dev/null +++ b/etcdserver/api/capability.go @@ -0,0 +1,99 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "sync" + "time" + + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/go-semver/semver" + "github.com/coreos/pkg/capnslog" +) + +type Capability string + +const ( + AuthCapability Capability = "auth" + V3rpcCapability Capability = "v3rpc" +) + +var ( + plog = capnslog.NewPackageLogger("github.com/coreos/etcd/etcdserver", "api") + + // capabilityMaps is a static map of version to capability map. + // the base capabilities is the set of capability 2.0 supports. + capabilityMaps = map[string]map[Capability]bool{ + "2.1.0": {AuthCapability: true}, + "2.2.0": {AuthCapability: true}, + "2.3.0": {AuthCapability: true}, + "3.0.0": {AuthCapability: true, V3rpcCapability: true}, + } + + // capLoopOnce ensures we only create one capability monitor goroutine + capLoopOnce sync.Once + + enableMapMu sync.RWMutex + // enabledMap points to a map in capabilityMaps + enabledMap map[Capability]bool +) + +func init() { + enabledMap = make(map[Capability]bool) +} + +// RunCapabilityLoop checks the cluster version every 500ms and updates +// the enabledMap when the cluster version increased. +func RunCapabilityLoop(s *etcdserver.EtcdServer) { + go capLoopOnce.Do(func() { runCapabilityLoop(s) }) +} + +func runCapabilityLoop(s *etcdserver.EtcdServer) { + stopped := s.StopNotify() + + var pv *semver.Version + for { + if v := s.ClusterVersion(); v != pv { + if pv == nil || (v != nil && pv.LessThan(*v)) { + pv = v + enableMapMu.Lock() + enabledMap = capabilityMaps[pv.String()] + enableMapMu.Unlock() + plog.Infof("enabled capabilities for version %s", pv) + } + } + + select { + case <-stopped: + return + case <-time.After(500 * time.Millisecond): + } + } +} + +func IsCapabilityEnabled(c Capability) bool { + enableMapMu.RLock() + defer enableMapMu.RUnlock() + if enabledMap == nil { + return false + } + return enabledMap[c] +} + +func EnableCapability(c Capability) { + enableMapMu.Lock() + defer enableMapMu.Unlock() + enabledMap[c] = true +} diff --git a/etcdserver/api/v2http/capability.go b/etcdserver/api/v2http/capability.go index a9a426219..fa0bcca5e 100644 --- a/etcdserver/api/v2http/capability.go +++ b/etcdserver/api/v2http/capability.go @@ -17,74 +17,14 @@ package v2http import ( "fmt" "net/http" - "sync" - "time" - "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api" "github.com/coreos/etcd/etcdserver/api/v2http/httptypes" - "github.com/coreos/go-semver/semver" ) -type capability string - -const ( - authCapability capability = "auth" -) - -var ( - // capabilityMaps is a static map of version to capability map. - // the base capabilities is the set of capability 2.0 supports. - capabilityMaps = map[string]map[capability]bool{ - "2.1.0": {authCapability: true}, - "2.2.0": {authCapability: true}, - "2.3.0": {authCapability: true}, - "3.0.0": {authCapability: true}, - } - - enableMapMu sync.Mutex - // enabledMap points to a map in capabilityMaps - enabledMap map[capability]bool -) - -// capabilityLoop checks the cluster version every 500ms and updates -// the enabledMap when the cluster version increased. -// capabilityLoop MUST be ran in a goroutine before checking capability -// or using capabilityHandler. -func capabilityLoop(s *etcdserver.EtcdServer) { - stopped := s.StopNotify() - - var pv *semver.Version - for { - if v := s.ClusterVersion(); v != pv { - if pv == nil || (v != nil && pv.LessThan(*v)) { - pv = v - enableMapMu.Lock() - enabledMap = capabilityMaps[pv.String()] - enableMapMu.Unlock() - plog.Infof("enabled capabilities for version %s", pv) - } - } - - select { - case <-stopped: - return - case <-time.After(500 * time.Millisecond): - } - } -} - -func isCapabilityEnabled(c capability) bool { - enableMapMu.Lock() - defer enableMapMu.Unlock() - if enabledMap == nil { - return false - } - return enabledMap[c] -} - -func capabilityHandler(c capability, fn func(http.ResponseWriter, *http.Request)) http.HandlerFunc { +func capabilityHandler(c api.Capability, fn func(http.ResponseWriter, *http.Request)) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - if !isCapabilityEnabled(c) { + if !api.IsCapabilityEnabled(c) { notCapable(w, r, c) return } @@ -92,7 +32,7 @@ func capabilityHandler(c capability, fn func(http.ResponseWriter, *http.Request) } } -func notCapable(w http.ResponseWriter, r *http.Request, c capability) { +func notCapable(w http.ResponseWriter, r *http.Request, c api.Capability) { herr := httptypes.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Not capable of accessing %s feature during rolling upgrades.", c)) if err := herr.WriteTo(w); err != nil { plog.Debugf("error writing HTTPError (%v) to %s", err, r.RemoteAddr) diff --git a/etcdserver/api/v2http/client.go b/etcdserver/api/v2http/client.go index 0bcf0636a..620afd11f 100644 --- a/etcdserver/api/v2http/client.go +++ b/etcdserver/api/v2http/client.go @@ -62,8 +62,6 @@ const ( // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests. func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler { - go capabilityLoop(server) - sec := auth.NewStore(server, timeout) kh := &keysHandler{ @@ -129,6 +127,7 @@ func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http mux.Handle(pprofPrefix+"/block", pprof.Handler("block")) } + api.RunCapabilityLoop(server) return requestLogger(mux) } diff --git a/etcdserver/api/v2http/client_auth.go b/etcdserver/api/v2http/client_auth.go index c4344f2d0..cf1585bed 100644 --- a/etcdserver/api/v2http/client_auth.go +++ b/etcdserver/api/v2http/client_auth.go @@ -134,11 +134,11 @@ func writeNoAuth(w http.ResponseWriter, r *http.Request) { } func handleAuth(mux *http.ServeMux, sh *authHandler) { - mux.HandleFunc(authPrefix+"/roles", capabilityHandler(authCapability, sh.baseRoles)) - mux.HandleFunc(authPrefix+"/roles/", capabilityHandler(authCapability, sh.handleRoles)) - mux.HandleFunc(authPrefix+"/users", capabilityHandler(authCapability, sh.baseUsers)) - mux.HandleFunc(authPrefix+"/users/", capabilityHandler(authCapability, sh.handleUsers)) - mux.HandleFunc(authPrefix+"/enable", capabilityHandler(authCapability, sh.enableDisable)) + mux.HandleFunc(authPrefix+"/roles", capabilityHandler(api.AuthCapability, sh.baseRoles)) + mux.HandleFunc(authPrefix+"/roles/", capabilityHandler(api.AuthCapability, sh.handleRoles)) + mux.HandleFunc(authPrefix+"/users", capabilityHandler(api.AuthCapability, sh.baseUsers)) + mux.HandleFunc(authPrefix+"/users/", capabilityHandler(api.AuthCapability, sh.handleUsers)) + mux.HandleFunc(authPrefix+"/enable", capabilityHandler(api.AuthCapability, sh.enableDisable)) } func (sh *authHandler) baseRoles(w http.ResponseWriter, r *http.Request) { diff --git a/etcdserver/api/v2http/client_auth_test.go b/etcdserver/api/v2http/client_auth_test.go index 91d41784e..734e19ac0 100644 --- a/etcdserver/api/v2http/client_auth_test.go +++ b/etcdserver/api/v2http/client_auth_test.go @@ -26,6 +26,7 @@ import ( "strings" "testing" + "github.com/coreos/etcd/etcdserver/api" "github.com/coreos/etcd/etcdserver/auth" ) @@ -103,10 +104,7 @@ func (s *mockAuthStore) HashPassword(password string) (string, error) { } func TestAuthFlow(t *testing.T) { - enableMapMu.Lock() - enabledMap = make(map[capability]bool) - enabledMap[authCapability] = true - enableMapMu.Unlock() + api.EnableCapability(api.AuthCapability) var testCases = []struct { req *http.Request store mockAuthStore diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go index 3ef980486..47b807496 100644 --- a/etcdserver/api/v3rpc/grpc.go +++ b/etcdserver/api/v3rpc/grpc.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -38,5 +39,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server { pb.RegisterClusterServer(grpcServer, NewClusterServer(s)) pb.RegisterAuthServer(grpcServer, NewAuthServer(s)) pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s)) + + api.RunCapabilityLoop(s) return grpcServer } diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index 2c70d84fd..f18f13626 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -20,6 +20,7 @@ import ( "time" "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" @@ -40,6 +41,10 @@ type streamsMap struct { func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + if !api.IsCapabilityEnabled(api.V3rpcCapability) { + return nil, rpctypes.ErrGRPCNotCapable + } + md, ok := metadata.FromContext(ctx) if ok { if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader { @@ -56,6 +61,10 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor smap := monitorLeader(s) return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if !api.IsCapabilityEnabled(api.V3rpcCapability) { + return rpctypes.ErrGRPCNotCapable + } + md, ok := metadata.FromContext(ss.Context()) if ok { if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader { diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index a86afdc50..16909287f 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -44,7 +44,8 @@ var ( ErrGRPCRoleNotFound = grpc.Errorf(codes.FailedPrecondition, "etcdserver: role name not found") ErrGRPCAuthFailed = grpc.Errorf(codes.InvalidArgument, "etcdserver: authentication failed, invalid user ID or password") - ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader") + ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader") + ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable") errStringToError = map[string]error{ grpc.ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey, @@ -70,7 +71,8 @@ var ( grpc.ErrorDesc(ErrGRPCRoleNotFound): ErrGRPCRoleNotFound, grpc.ErrorDesc(ErrGRPCAuthFailed): ErrGRPCAuthFailed, - grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader, + grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader, + grpc.ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable, } // client-side error @@ -97,7 +99,8 @@ var ( ErrRoleNotFound = Error(ErrGRPCRoleNotFound) ErrAuthFailed = Error(ErrGRPCAuthFailed) - ErrNoLeader = Error(ErrGRPCNoLeader) + ErrNoLeader = Error(ErrGRPCNoLeader) + ErrNotCapable = Error(ErrGRPCNotCapable) ) // EtcdError defines gRPC server errors. diff --git a/integration/cluster.go b/integration/cluster.go index 613bec1ff..9b6e749c3 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -37,6 +37,7 @@ import ( "github.com/coreos/etcd/client" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api" "github.com/coreos/etcd/etcdserver/api/v2http" "github.com/coreos/etcd/etcdserver/api/v3rpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -759,6 +760,10 @@ func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 { clus.clients = append(clus.clients, client) } clus.Launch(t) + + // manually enable v3 capability since we know we are starting a v3 cluster here. + api.EnableCapability(api.V3rpcCapability) + return clus } diff --git a/integration/v3_barrier_test.go b/integration/v3_barrier_test.go index edf11a5d4..f86999c47 100644 --- a/integration/v3_barrier_test.go +++ b/integration/v3_barrier_test.go @@ -25,7 +25,7 @@ import ( func TestBarrierSingleNode(t *testing.T) { defer testutil.AfterTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) testBarrier(t, 5, func() *clientv3.Client { return clus.clients[0] }) }