From 29abd62338573687c182abb5fe2f7a5b70aa31f7 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 23 May 2022 18:45:22 +0800 Subject: [PATCH] introduce GRPCAdditionalServerOptions Signed-off-by: Ryan Leung --- server/embed/config.go | 4 ++ server/embed/etcd.go | 1 + tests/framework/integration/cluster.go | 15 ++++-- tests/integration/clientv3/kv_test.go | 2 - tests/integration/v3_grpc_test.go | 64 ++++++++++++++++++++++++++ 5 files changed, 79 insertions(+), 7 deletions(-) diff --git a/server/embed/config.go b/server/embed/config.go index 0545c80cd..00cf3cf7d 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -284,6 +284,10 @@ type Config struct { // before closing a non-responsive connection. 0 to disable. GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"` + // GRPCAdditionalServerOptions is the additional server option hook + // for changing the default internal gRPC configuration. + GRPCAdditionalServerOptions []grpc.ServerOption `json:"grpc-additional-server-options"` + // SocketOpts are socket options passed to listener config. SocketOpts transport.SocketOpts `json:"socket-options"` diff --git a/server/embed/etcd.go b/server/embed/etcd.go index ce3b33986..cb32d182d 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -761,6 +761,7 @@ func (e *Etcd) serveClients() { Timeout: e.cfg.GRPCKeepAliveTimeout, })) } + gopts = append(gopts, e.cfg.GRPCAdditionalServerOptions...) splitHTTP := false for _, sctx := range e.sctxs { diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index 4f0f550ae..e3ef2a448 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -144,14 +144,16 @@ type ClusterConfig struct { QuotaBackendBytes int64 BackendBatchInterval time.Duration - MaxTxnOps uint - MaxRequestBytes uint + MaxTxnOps uint + MaxRequestBytes uint + SnapshotCount uint64 SnapshotCatchUpEntries uint64 - GRPCKeepAliveMinTime time.Duration - GRPCKeepAliveInterval time.Duration - GRPCKeepAliveTimeout time.Duration + GRPCKeepAliveMinTime time.Duration + GRPCKeepAliveInterval time.Duration + GRPCKeepAliveTimeout time.Duration + GRPCAdditionalServerOptions []grpc.ServerOption ClientMaxCallSendMsgSize int ClientMaxCallRecvMsgSize int @@ -278,6 +280,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member { GRPCKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime, GRPCKeepAliveInterval: c.Cfg.GRPCKeepAliveInterval, GRPCKeepAliveTimeout: c.Cfg.GRPCKeepAliveTimeout, + GRPCAdditionalServerOptions: c.Cfg.GRPCAdditionalServerOptions, ClientMaxCallSendMsgSize: c.Cfg.ClientMaxCallSendMsgSize, ClientMaxCallRecvMsgSize: c.Cfg.ClientMaxCallRecvMsgSize, UseIP: c.Cfg.UseIP, @@ -603,6 +606,7 @@ type MemberConfig struct { GRPCKeepAliveMinTime time.Duration GRPCKeepAliveInterval time.Duration GRPCKeepAliveTimeout time.Duration + GRPCAdditionalServerOptions []grpc.ServerOption ClientMaxCallSendMsgSize int ClientMaxCallRecvMsgSize int UseIP bool @@ -709,6 +713,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { Timeout: mcfg.GRPCKeepAliveTimeout, })) } + m.GRPCServerOpts = append(m.GRPCServerOpts, mcfg.GRPCAdditionalServerOptions...) m.ClientMaxCallSendMsgSize = mcfg.ClientMaxCallSendMsgSize m.ClientMaxCallRecvMsgSize = mcfg.ClientMaxCallRecvMsgSize m.UseIP = mcfg.UseIP diff --git a/tests/integration/clientv3/kv_test.go b/tests/integration/clientv3/kv_test.go index 6443717c9..223058a03 100644 --- a/tests/integration/clientv3/kv_test.go +++ b/tests/integration/clientv3/kv_test.go @@ -702,14 +702,12 @@ func TestKVLargeRequests(t *testing.T) { // without proper client-side receive size limit // "code = ResourceExhausted desc = grpc: received message larger than max (5242929 vs. 4194304)" { - maxRequestBytesServer: 7*1024*1024 + 512*1024, maxCallSendBytesClient: 7 * 1024 * 1024, maxCallRecvBytesClient: 0, valueSize: 5 * 1024 * 1024, expectError: nil, }, - { maxRequestBytesServer: 10 * 1024 * 1024, maxCallSendBytesClient: 100 * 1024 * 1024, diff --git a/tests/integration/v3_grpc_test.go b/tests/integration/v3_grpc_test.go index 4e4cdf9fe..2cc4ab660 100644 --- a/tests/integration/v3_grpc_test.go +++ b/tests/integration/v3_grpc_test.go @@ -1923,7 +1923,71 @@ func TestV3LargeRequests(t *testing.T) { t.Errorf("#%d: range expected no error, got %v", i, err) } } + }) + } +} +// TestV3AdditionalGRPCOptions ensures that configurable GRPCAdditionalServerOptions works as intended. +func TestV3AdditionalGRPCOptions(t *testing.T) { + integration.BeforeTest(t) + tests := []struct { + name string + maxRequestBytes uint + grpcOpts []grpc.ServerOption + valueSize int + expectError error + }{ + { + name: "requests will get a gRPC error because it's larger than gRPC MaxRecvMsgSize", + maxRequestBytes: 8 * 1024 * 1024, + grpcOpts: nil, + valueSize: 9 * 1024 * 1024, + expectError: status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max"), + }, + { + name: "requests will get an etcd custom gRPC error because it's larger than MaxRequestBytes", + maxRequestBytes: 8 * 1024 * 1024, + grpcOpts: []grpc.ServerOption{grpc.MaxRecvMsgSize(10 * 1024 * 1024)}, + valueSize: 9 * 1024 * 1024, + expectError: rpctypes.ErrGRPCRequestTooLarge, + }, + { + name: "requests size is smaller than MaxRequestBytes but larger than MaxRecvMsgSize", + maxRequestBytes: 8 * 1024 * 1024, + grpcOpts: []grpc.ServerOption{grpc.MaxRecvMsgSize(4 * 1024 * 1024)}, + valueSize: 6 * 1024 * 1024, + expectError: status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max"), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + clus := integration.NewCluster(t, &integration.ClusterConfig{ + Size: 1, + MaxRequestBytes: test.maxRequestBytes, + ClientMaxCallSendMsgSize: 12 * 1024 * 1024, + GRPCAdditionalServerOptions: test.grpcOpts, + }) + defer clus.Terminate(t) + kvcli := integration.ToGRPC(clus.Client(0)).KV + reqput := &pb.PutRequest{Key: []byte("foo"), Value: make([]byte, test.valueSize)} + if _, err := kvcli.Put(context.TODO(), reqput); err != nil { + if _, ok := err.(rpctypes.EtcdError); ok { + if err.Error() != status.Convert(test.expectError).Message() { + t.Errorf("expected %v, got %v", status.Convert(test.expectError).Message(), err.Error()) + } + } else if !strings.HasPrefix(err.Error(), test.expectError.Error()) { + t.Errorf("expected error starting with '%s', got '%s'", test.expectError.Error(), err.Error()) + } + } + // request went through, expect large response back from server + if test.expectError == nil { + reqget := &pb.RangeRequest{Key: []byte("foo")} + // limit receive call size with original value + gRPC overhead bytes + _, err := kvcli.Range(context.TODO(), reqget, grpc.MaxCallRecvMsgSize(test.valueSize+512*1024)) + if err != nil { + t.Errorf("range expected no error, got %v", err) + } + } }) } }