mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #11687 from gyuho/embed-client-version
clientv3: fix "hasleader" metadata key, embed client version
This commit is contained in:
commit
1c16c242db
1
.words
1
.words
@ -30,6 +30,7 @@ etcd
|
||||
gRPC
|
||||
goroutine
|
||||
goroutines
|
||||
hasleader
|
||||
healthcheck
|
||||
hostname
|
||||
iff
|
||||
|
@ -37,7 +37,6 @@ import (
|
||||
"google.golang.org/grpc/codes"
|
||||
grpccredentials "google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
@ -393,13 +392,6 @@ func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCrede
|
||||
return creds
|
||||
}
|
||||
|
||||
// WithRequireLeader requires client requests to only succeed
|
||||
// when the cluster has a leader.
|
||||
func WithRequireLeader(ctx context.Context) context.Context {
|
||||
md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
|
||||
return metadata.NewOutgoingContext(ctx, md)
|
||||
}
|
||||
|
||||
func newClient(cfg *Config) (*Client, error) {
|
||||
if cfg == nil {
|
||||
cfg = &Config{}
|
||||
|
48
clientv3/ctx.go
Normal file
48
clientv3/ctx.go
Normal file
@ -0,0 +1,48 @@
|
||||
// Copyright 2020 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package clientv3
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/version"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
// WithRequireLeader requires client requests to only succeed
|
||||
// when the cluster has a leader.
|
||||
func WithRequireLeader(ctx context.Context) context.Context {
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if !ok { // no outgoing metadata ctx key, create one
|
||||
md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
|
||||
return metadata.NewOutgoingContext(ctx, md)
|
||||
}
|
||||
// overwrite/add 'hasleader' key/value
|
||||
md.Set(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
|
||||
return metadata.NewOutgoingContext(ctx, md)
|
||||
}
|
||||
|
||||
// embeds client version
|
||||
func withVersion(ctx context.Context) context.Context {
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if !ok { // no outgoing metadata ctx key, create one
|
||||
md = metadata.Pairs(rpctypes.MetadataClientAPIVersionKey, version.APIVersion)
|
||||
return metadata.NewOutgoingContext(ctx, md)
|
||||
}
|
||||
// overwrite/add version key/value
|
||||
md.Set(rpctypes.MetadataClientAPIVersionKey, version.APIVersion)
|
||||
return metadata.NewOutgoingContext(ctx, md)
|
||||
}
|
67
clientv3/ctx_test.go
Normal file
67
clientv3/ctx_test.go
Normal file
@ -0,0 +1,67 @@
|
||||
// Copyright 2020 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package clientv3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/version"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
func TestMetadataWithRequireLeader(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if ok {
|
||||
t.Fatal("expected no outgoing metadata ctx key")
|
||||
}
|
||||
|
||||
// add a conflicting key with some other value
|
||||
md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, "invalid")
|
||||
// add a key, and expect not be overwritten
|
||||
md.Set("hello", "1", "2")
|
||||
ctx = metadata.NewOutgoingContext(ctx, md)
|
||||
|
||||
// expect overwrites but still keep other keys
|
||||
ctx = WithRequireLeader(ctx)
|
||||
md, ok = metadata.FromOutgoingContext(ctx)
|
||||
if !ok {
|
||||
t.Fatal("expected outgoing metadata ctx key")
|
||||
}
|
||||
if ss := md.Get(rpctypes.MetadataRequireLeaderKey); !reflect.DeepEqual(ss, []string{rpctypes.MetadataHasLeader}) {
|
||||
t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataRequireLeaderKey, ss)
|
||||
}
|
||||
if ss := md.Get("hello"); !reflect.DeepEqual(ss, []string{"1", "2"}) {
|
||||
t.Fatalf("unexpected metadata for 'hello' %v", ss)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetadataWithClientAPIVersion(t *testing.T) {
|
||||
ctx := withVersion(WithRequireLeader(context.TODO()))
|
||||
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if !ok {
|
||||
t.Fatal("expected outgoing metadata ctx key")
|
||||
}
|
||||
if ss := md.Get(rpctypes.MetadataRequireLeaderKey); !reflect.DeepEqual(ss, []string{rpctypes.MetadataHasLeader}) {
|
||||
t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataRequireLeaderKey, ss)
|
||||
}
|
||||
if ss := md.Get(rpctypes.MetadataClientAPIVersionKey); !reflect.DeepEqual(ss, []string{version.APIVersion}) {
|
||||
t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataClientAPIVersionKey, ss)
|
||||
}
|
||||
}
|
@ -17,8 +17,10 @@ package integration
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@ -28,6 +30,7 @@ import (
|
||||
"go.etcd.io/etcd/integration"
|
||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||
"go.etcd.io/etcd/pkg/testutil"
|
||||
"go.etcd.io/etcd/version"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
@ -208,6 +211,22 @@ func TestKVPutWithRequireLeader(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cnt, err := clus.Members[0].Metric(
|
||||
"etcd_server_client_requests_total",
|
||||
`type="unary"`,
|
||||
fmt.Sprintf(`client_api_version="%v"`, version.APIVersion),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cv, err := strconv.ParseInt(cnt, 10, 32)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if cv < 1 { // >1 when retried
|
||||
t.Fatalf("expected at least 1, got %q", cnt)
|
||||
}
|
||||
|
||||
// clients may give timeout errors since the members are stopped; take
|
||||
// the clients so that terminating the cluster won't complain
|
||||
clus.Client(1).Close()
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -29,6 +30,7 @@ import (
|
||||
"go.etcd.io/etcd/integration"
|
||||
mvccpb "go.etcd.io/etcd/mvcc/mvccpb"
|
||||
"go.etcd.io/etcd/pkg/testutil"
|
||||
"go.etcd.io/etcd/version"
|
||||
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
@ -839,6 +841,22 @@ func TestWatchWithRequireLeader(t *testing.T) {
|
||||
if _, ok := <-chNoLeader; !ok {
|
||||
t.Fatalf("expected response, got closed channel")
|
||||
}
|
||||
|
||||
cnt, err := clus.Members[0].Metric(
|
||||
"etcd_server_client_requests_total",
|
||||
`type="stream"`,
|
||||
fmt.Sprintf(`client_api_version="%v"`, version.APIVersion),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cv, err := strconv.ParseInt(cnt, 10, 32)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if cv < 2 { // >2 when retried
|
||||
t.Fatalf("expected at least 2, got %q", cnt)
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatchWithFilter checks that watch filtering works.
|
||||
|
@ -38,6 +38,7 @@ import (
|
||||
func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor {
|
||||
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
ctx = withVersion(ctx)
|
||||
grpcOpts, retryOpts := filterCallOptions(opts)
|
||||
callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
|
||||
// short circuit for simplicity, and avoiding allocations.
|
||||
@ -103,6 +104,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
|
||||
func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor {
|
||||
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
ctx = withVersion(ctx)
|
||||
grpcOpts, retryOpts := filterCallOptions(opts)
|
||||
callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
|
||||
// short circuit for simplicity, and avoiding allocations.
|
||||
|
@ -53,6 +53,12 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
||||
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if ok {
|
||||
ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey)
|
||||
if len(vs) > 0 {
|
||||
ver = vs[0]
|
||||
}
|
||||
clientRequests.WithLabelValues("unary", ver).Inc()
|
||||
|
||||
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
|
||||
if s.Leader() == types.ID(raft.None) {
|
||||
return nil, rpctypes.ErrGRPCNoLeader
|
||||
@ -184,6 +190,12 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
|
||||
|
||||
md, ok := metadata.FromIncomingContext(ss.Context())
|
||||
if ok {
|
||||
ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey)
|
||||
if len(vs) > 0 {
|
||||
ver = vs[0]
|
||||
}
|
||||
clientRequests.WithLabelValues("stream", ver).Inc()
|
||||
|
||||
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
|
||||
if s.Leader() == types.ID(raft.None) {
|
||||
return rpctypes.ErrGRPCNoLeader
|
||||
@ -202,7 +214,6 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
|
||||
smap.mu.Unlock()
|
||||
cancel()
|
||||
}()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,10 +39,20 @@ var (
|
||||
},
|
||||
[]string{"Type", "API"},
|
||||
)
|
||||
|
||||
clientRequests = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "client_requests_total",
|
||||
Help: "The total number of client requests per client version.",
|
||||
},
|
||||
[]string{"type", "client_api_version"},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(sentBytes)
|
||||
prometheus.MustRegister(receivedBytes)
|
||||
prometheus.MustRegister(streamFailures)
|
||||
prometheus.MustRegister(clientRequests)
|
||||
}
|
||||
|
@ -17,4 +17,6 @@ package rpctypes
|
||||
var (
|
||||
MetadataRequireLeaderKey = "hasleader"
|
||||
MetadataHasLeader = "true"
|
||||
|
||||
MetadataClientAPIVersionKey = "client-api-version"
|
||||
)
|
||||
|
@ -1143,7 +1143,7 @@ func (m *member) Terminate(t testing.TB) {
|
||||
}
|
||||
|
||||
// Metric gets the metric value for a member
|
||||
func (m *member) Metric(metricName string) (string, error) {
|
||||
func (m *member) Metric(metricName string, expectLabels ...string) (string, error) {
|
||||
cfgtls := transport.TLSInfo{}
|
||||
tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second)
|
||||
if err != nil {
|
||||
@ -1161,9 +1161,20 @@ func (m *member) Metric(metricName string) (string, error) {
|
||||
}
|
||||
lines := strings.Split(string(b), "\n")
|
||||
for _, l := range lines {
|
||||
if strings.HasPrefix(l, metricName) {
|
||||
return strings.Split(l, " ")[1], nil
|
||||
if !strings.HasPrefix(l, metricName) {
|
||||
continue
|
||||
}
|
||||
ok := true
|
||||
for _, lv := range expectLabels {
|
||||
if !strings.Contains(l, lv) {
|
||||
ok = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
return strings.Split(l, " ")[1], nil
|
||||
}
|
||||
return "", nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user