diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go index df4e00937..fad1f43d6 100644 --- a/etcdserver/api/v3rpc/grpc.go +++ b/etcdserver/api/v3rpc/grpc.go @@ -28,7 +28,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server { if tls != nil { opts = append(opts, grpc.Creds(credentials.NewTLS(tls))) } - opts = append(opts, grpc.UnaryInterceptor(metricsUnaryInterceptor)) + opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s))) opts = append(opts, grpc.StreamInterceptor(metricsStreamInterceptor)) grpcServer := grpc.NewServer(opts...) diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go new file mode 100644 index 000000000..6872bfd11 --- /dev/null +++ b/etcdserver/api/v3rpc/interceptor.go @@ -0,0 +1,77 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3rpc + +import ( + "strings" + "time" + + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft" + + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + md, ok := metadata.FromContext(ctx) + if ok { + if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader { + if s.Leader() == types.ID(raft.None) { + return nil, rpctypes.ErrGRPCNoLeader + } + } + } + return metricsUnaryInterceptor(ctx, req, info, handler) + } +} + +func metricsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + service, method := splitMethodName(info.FullMethod) + receivedCounter.WithLabelValues(service, method).Inc() + + start := time.Now() + resp, err = handler(ctx, req) + if err != nil { + failedCounter.WithLabelValues(service, method, grpc.Code(err).String()).Inc() + } + handlingDuration.WithLabelValues(service, method).Observe(time.Since(start).Seconds()) + + return resp, err +} + +func metricsStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + service, method := splitMethodName(info.FullMethod) + receivedCounter.WithLabelValues(service, method).Inc() + + err := handler(srv, ss) + if err != nil { + failedCounter.WithLabelValues(service, method, grpc.Code(err).String()).Inc() + } + + return err +} + +func splitMethodName(fullMethodName string) (string, string) { + fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash + if i := strings.Index(fullMethodName, "/"); i >= 0 { + return fullMethodName[:i], fullMethodName[i+1:] + } + return "unknown", "unknown" +} diff --git a/etcdserver/api/v3rpc/metrics.go b/etcdserver/api/v3rpc/metrics.go index 169b10eac..bb7c636d2 100644 --- a/etcdserver/api/v3rpc/metrics.go +++ b/etcdserver/api/v3rpc/metrics.go @@ -14,15 +14,7 @@ package v3rpc -import ( - "strings" - "time" - - "github.com/prometheus/client_golang/prometheus" - - "golang.org/x/net/context" - "google.golang.org/grpc" -) +import "github.com/prometheus/client_golang/prometheus" var ( receivedCounter = prometheus.NewCounterVec( @@ -51,40 +43,6 @@ var ( }, []string{"grpc_service", "grpc_method"}) ) -func metricsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - service, method := splitMethodName(info.FullMethod) - receivedCounter.WithLabelValues(service, method).Inc() - - start := time.Now() - resp, err = handler(ctx, req) - if err != nil { - failedCounter.WithLabelValues(service, method, grpc.Code(err).String()).Inc() - } - handlingDuration.WithLabelValues(service, method).Observe(time.Since(start).Seconds()) - - return resp, err -} - -func metricsStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - service, method := splitMethodName(info.FullMethod) - receivedCounter.WithLabelValues(service, method).Inc() - - err := handler(srv, ss) - if err != nil { - failedCounter.WithLabelValues(service, method, grpc.Code(err).String()).Inc() - } - - return err -} - -func splitMethodName(fullMethodName string) (string, string) { - fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash - if i := strings.Index(fullMethodName, "/"); i >= 0 { - return fullMethodName[:i], fullMethodName[i+1:] - } - return "unknown", "unknown" -} - func init() { prometheus.MustRegister(receivedCounter) prometheus.MustRegister(failedCounter) diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 5b0d5630c..2b1437dd7 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -44,6 +44,8 @@ var ( ErrGRPCRoleNotFound = grpc.Errorf(codes.FailedPrecondition, "etcdserver: role name not found") ErrGRPCAuthFailed = grpc.Errorf(codes.InvalidArgument, "etcdserver: authentication failed, invalid user ID or password") + ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader") + errStringToError = map[string]error{ grpc.ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey, grpc.ErrorDesc(ErrGRPCTooManyOps): ErrGRPCTooManyOps, @@ -67,6 +69,8 @@ var ( grpc.ErrorDesc(ErrGRPCRoleAlreadyExist): ErrGRPCRoleAlreadyExist, grpc.ErrorDesc(ErrGRPCRoleNotFound): ErrGRPCRoleNotFound, grpc.ErrorDesc(ErrGRPCAuthFailed): ErrGRPCAuthFailed, + + grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader, } // client-side error @@ -92,6 +96,8 @@ var ( ErrRoleAlreadyExist = Error(ErrGRPCRoleAlreadyExist) ErrRoleNotFound = Error(ErrGRPCRoleNotFound) ErrAuthFailed = Error(ErrGRPCAuthFailed) + + ErrNoLeader = Error(ErrGRPCNoLeader) ) // EtcdError defines gRPC server errors. diff --git a/etcdserver/api/v3rpc/rpctypes/md.go b/etcdserver/api/v3rpc/rpctypes/md.go new file mode 100644 index 000000000..9ef96e41c --- /dev/null +++ b/etcdserver/api/v3rpc/rpctypes/md.go @@ -0,0 +1,20 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rpctypes + +var ( + MetadataRequireLeaderKey = "hasleader" + MetadataHasLeader = "true" +) diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 514de72d0..95f2c24a9 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -27,6 +27,7 @@ import ( "github.com/coreos/etcd/pkg/testutil" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) // TestV3PutOverwrite puts a key with the v3 api to a random cluster member, @@ -976,3 +977,30 @@ func TestTLSGRPCAcceptSecureAll(t *testing.T) { t.Fatalf("unexpected error on put over tls (%v)", err) } } + +func TestGRPCRequireLeader(t *testing.T) { + defer testutil.AfterTest(t) + + cfg := ClusterConfig{Size: 3} + clus := newClusterV3NoClients(t, &cfg) + defer clus.Terminate(t) + + clus.Members[1].Stop(t) + clus.Members[2].Stop(t) + + client, err := NewClientV3(clus.Members[0]) + if err != nil { + t.Fatalf("expected tls client (%v)", err) + } + defer client.Close() + + // wait for election timeout, then member[0] will not have a leader. + time.Sleep(time.Duration(3*electionTicks) * tickDuration) + + md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) + ctx := metadata.NewContext(context.Background(), md) + reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} + if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() { + t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader) + } +}