mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
api: add v3rpc capability
This commit is contained in:
parent
82c6408f38
commit
1c544c3ba5
99
etcdserver/api/capability.go
Normal file
99
etcdserver/api/capability.go
Normal file
@ -0,0 +1,99 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
)
|
||||
|
||||
type Capability string
|
||||
|
||||
const (
|
||||
AuthCapability Capability = "auth"
|
||||
V3rpcCapability Capability = "v3rpc"
|
||||
)
|
||||
|
||||
var (
|
||||
plog = capnslog.NewPackageLogger("github.com/coreos/etcd/etcdserver", "api")
|
||||
|
||||
// capabilityMaps is a static map of version to capability map.
|
||||
// the base capabilities is the set of capability 2.0 supports.
|
||||
capabilityMaps = map[string]map[Capability]bool{
|
||||
"2.1.0": {AuthCapability: true},
|
||||
"2.2.0": {AuthCapability: true},
|
||||
"2.3.0": {AuthCapability: true},
|
||||
"3.0.0": {AuthCapability: true, V3rpcCapability: true},
|
||||
}
|
||||
|
||||
// capLoopOnce ensures we only create one capability monitor goroutine
|
||||
capLoopOnce sync.Once
|
||||
|
||||
enableMapMu sync.RWMutex
|
||||
// enabledMap points to a map in capabilityMaps
|
||||
enabledMap map[Capability]bool
|
||||
)
|
||||
|
||||
func init() {
|
||||
enabledMap = make(map[Capability]bool)
|
||||
}
|
||||
|
||||
// RunCapabilityLoop checks the cluster version every 500ms and updates
|
||||
// the enabledMap when the cluster version increased.
|
||||
func RunCapabilityLoop(s *etcdserver.EtcdServer) {
|
||||
go capLoopOnce.Do(func() { runCapabilityLoop(s) })
|
||||
}
|
||||
|
||||
func runCapabilityLoop(s *etcdserver.EtcdServer) {
|
||||
stopped := s.StopNotify()
|
||||
|
||||
var pv *semver.Version
|
||||
for {
|
||||
if v := s.ClusterVersion(); v != pv {
|
||||
if pv == nil || (v != nil && pv.LessThan(*v)) {
|
||||
pv = v
|
||||
enableMapMu.Lock()
|
||||
enabledMap = capabilityMaps[pv.String()]
|
||||
enableMapMu.Unlock()
|
||||
plog.Infof("enabled capabilities for version %s", pv)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-stopped:
|
||||
return
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func IsCapabilityEnabled(c Capability) bool {
|
||||
enableMapMu.RLock()
|
||||
defer enableMapMu.RUnlock()
|
||||
if enabledMap == nil {
|
||||
return false
|
||||
}
|
||||
return enabledMap[c]
|
||||
}
|
||||
|
||||
func EnableCapability(c Capability) {
|
||||
enableMapMu.Lock()
|
||||
defer enableMapMu.Unlock()
|
||||
enabledMap[c] = true
|
||||
}
|
@ -17,74 +17,14 @@ package v2http
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
|
||||
"github.com/coreos/go-semver/semver"
|
||||
)
|
||||
|
||||
type capability string
|
||||
|
||||
const (
|
||||
authCapability capability = "auth"
|
||||
)
|
||||
|
||||
var (
|
||||
// capabilityMaps is a static map of version to capability map.
|
||||
// the base capabilities is the set of capability 2.0 supports.
|
||||
capabilityMaps = map[string]map[capability]bool{
|
||||
"2.1.0": {authCapability: true},
|
||||
"2.2.0": {authCapability: true},
|
||||
"2.3.0": {authCapability: true},
|
||||
"3.0.0": {authCapability: true},
|
||||
}
|
||||
|
||||
enableMapMu sync.Mutex
|
||||
// enabledMap points to a map in capabilityMaps
|
||||
enabledMap map[capability]bool
|
||||
)
|
||||
|
||||
// capabilityLoop checks the cluster version every 500ms and updates
|
||||
// the enabledMap when the cluster version increased.
|
||||
// capabilityLoop MUST be ran in a goroutine before checking capability
|
||||
// or using capabilityHandler.
|
||||
func capabilityLoop(s *etcdserver.EtcdServer) {
|
||||
stopped := s.StopNotify()
|
||||
|
||||
var pv *semver.Version
|
||||
for {
|
||||
if v := s.ClusterVersion(); v != pv {
|
||||
if pv == nil || (v != nil && pv.LessThan(*v)) {
|
||||
pv = v
|
||||
enableMapMu.Lock()
|
||||
enabledMap = capabilityMaps[pv.String()]
|
||||
enableMapMu.Unlock()
|
||||
plog.Infof("enabled capabilities for version %s", pv)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-stopped:
|
||||
return
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isCapabilityEnabled(c capability) bool {
|
||||
enableMapMu.Lock()
|
||||
defer enableMapMu.Unlock()
|
||||
if enabledMap == nil {
|
||||
return false
|
||||
}
|
||||
return enabledMap[c]
|
||||
}
|
||||
|
||||
func capabilityHandler(c capability, fn func(http.ResponseWriter, *http.Request)) http.HandlerFunc {
|
||||
func capabilityHandler(c api.Capability, fn func(http.ResponseWriter, *http.Request)) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !isCapabilityEnabled(c) {
|
||||
if !api.IsCapabilityEnabled(c) {
|
||||
notCapable(w, r, c)
|
||||
return
|
||||
}
|
||||
@ -92,7 +32,7 @@ func capabilityHandler(c capability, fn func(http.ResponseWriter, *http.Request)
|
||||
}
|
||||
}
|
||||
|
||||
func notCapable(w http.ResponseWriter, r *http.Request, c capability) {
|
||||
func notCapable(w http.ResponseWriter, r *http.Request, c api.Capability) {
|
||||
herr := httptypes.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Not capable of accessing %s feature during rolling upgrades.", c))
|
||||
if err := herr.WriteTo(w); err != nil {
|
||||
plog.Debugf("error writing HTTPError (%v) to %s", err, r.RemoteAddr)
|
||||
|
@ -62,8 +62,6 @@ const (
|
||||
|
||||
// NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
|
||||
func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler {
|
||||
go capabilityLoop(server)
|
||||
|
||||
sec := auth.NewStore(server, timeout)
|
||||
|
||||
kh := &keysHandler{
|
||||
@ -129,6 +127,7 @@ func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http
|
||||
mux.Handle(pprofPrefix+"/block", pprof.Handler("block"))
|
||||
}
|
||||
|
||||
api.RunCapabilityLoop(server)
|
||||
return requestLogger(mux)
|
||||
}
|
||||
|
||||
|
@ -134,11 +134,11 @@ func writeNoAuth(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func handleAuth(mux *http.ServeMux, sh *authHandler) {
|
||||
mux.HandleFunc(authPrefix+"/roles", capabilityHandler(authCapability, sh.baseRoles))
|
||||
mux.HandleFunc(authPrefix+"/roles/", capabilityHandler(authCapability, sh.handleRoles))
|
||||
mux.HandleFunc(authPrefix+"/users", capabilityHandler(authCapability, sh.baseUsers))
|
||||
mux.HandleFunc(authPrefix+"/users/", capabilityHandler(authCapability, sh.handleUsers))
|
||||
mux.HandleFunc(authPrefix+"/enable", capabilityHandler(authCapability, sh.enableDisable))
|
||||
mux.HandleFunc(authPrefix+"/roles", capabilityHandler(api.AuthCapability, sh.baseRoles))
|
||||
mux.HandleFunc(authPrefix+"/roles/", capabilityHandler(api.AuthCapability, sh.handleRoles))
|
||||
mux.HandleFunc(authPrefix+"/users", capabilityHandler(api.AuthCapability, sh.baseUsers))
|
||||
mux.HandleFunc(authPrefix+"/users/", capabilityHandler(api.AuthCapability, sh.handleUsers))
|
||||
mux.HandleFunc(authPrefix+"/enable", capabilityHandler(api.AuthCapability, sh.enableDisable))
|
||||
}
|
||||
|
||||
func (sh *authHandler) baseRoles(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api"
|
||||
"github.com/coreos/etcd/etcdserver/auth"
|
||||
)
|
||||
|
||||
@ -103,10 +104,7 @@ func (s *mockAuthStore) HashPassword(password string) (string, error) {
|
||||
}
|
||||
|
||||
func TestAuthFlow(t *testing.T) {
|
||||
enableMapMu.Lock()
|
||||
enabledMap = make(map[capability]bool)
|
||||
enabledMap[authCapability] = true
|
||||
enableMapMu.Unlock()
|
||||
api.EnableCapability(api.AuthCapability)
|
||||
var testCases = []struct {
|
||||
req *http.Request
|
||||
store mockAuthStore
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"crypto/tls"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
@ -38,5 +39,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
|
||||
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
|
||||
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
|
||||
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
|
||||
|
||||
api.RunCapabilityLoop(s)
|
||||
return grpcServer
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
@ -40,6 +41,10 @@ type streamsMap struct {
|
||||
|
||||
func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
if !api.IsCapabilityEnabled(api.V3rpcCapability) {
|
||||
return nil, rpctypes.ErrGRPCNotCapable
|
||||
}
|
||||
|
||||
md, ok := metadata.FromContext(ctx)
|
||||
if ok {
|
||||
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
|
||||
@ -56,6 +61,10 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
|
||||
smap := monitorLeader(s)
|
||||
|
||||
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
if !api.IsCapabilityEnabled(api.V3rpcCapability) {
|
||||
return rpctypes.ErrGRPCNotCapable
|
||||
}
|
||||
|
||||
md, ok := metadata.FromContext(ss.Context())
|
||||
if ok {
|
||||
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
|
||||
|
@ -44,7 +44,8 @@ var (
|
||||
ErrGRPCRoleNotFound = grpc.Errorf(codes.FailedPrecondition, "etcdserver: role name not found")
|
||||
ErrGRPCAuthFailed = grpc.Errorf(codes.InvalidArgument, "etcdserver: authentication failed, invalid user ID or password")
|
||||
|
||||
ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader")
|
||||
ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader")
|
||||
ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable")
|
||||
|
||||
errStringToError = map[string]error{
|
||||
grpc.ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey,
|
||||
@ -70,7 +71,8 @@ var (
|
||||
grpc.ErrorDesc(ErrGRPCRoleNotFound): ErrGRPCRoleNotFound,
|
||||
grpc.ErrorDesc(ErrGRPCAuthFailed): ErrGRPCAuthFailed,
|
||||
|
||||
grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
|
||||
grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
|
||||
grpc.ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
|
||||
}
|
||||
|
||||
// client-side error
|
||||
@ -97,7 +99,8 @@ var (
|
||||
ErrRoleNotFound = Error(ErrGRPCRoleNotFound)
|
||||
ErrAuthFailed = Error(ErrGRPCAuthFailed)
|
||||
|
||||
ErrNoLeader = Error(ErrGRPCNoLeader)
|
||||
ErrNoLeader = Error(ErrGRPCNoLeader)
|
||||
ErrNotCapable = Error(ErrGRPCNotCapable)
|
||||
)
|
||||
|
||||
// EtcdError defines gRPC server errors.
|
||||
|
@ -37,6 +37,7 @@ import (
|
||||
"github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
@ -746,6 +747,10 @@ func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
|
||||
clus.clients = append(clus.clients, client)
|
||||
}
|
||||
clus.Launch(t)
|
||||
|
||||
// manually enable v3 capability since we know we are starting a v3 cluster here.
|
||||
api.EnableCapability(api.V3rpcCapability)
|
||||
|
||||
return clus
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,7 @@ import (
|
||||
|
||||
func TestBarrierSingleNode(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
testBarrier(t, 5, func() *clientv3.Client { return clus.clients[0] })
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user