Merge pull request #9047 from gyuho/client-grpc-call-options

*: configurable gRPC message size limits
This commit is contained in:
Gyuho Lee 2017-12-20 12:25:53 -08:00 committed by GitHub
commit 94e50a1e68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 617 additions and 149 deletions

3
.words
View File

@ -1,8 +1,11 @@
DefaultMaxRequestBytes
ErrCodeEnhanceYourCalm
ErrTimeout
GoAway
KeepAlive
Keepalive
MiB
ResourceExhausted
RPC
RPCs
TODO

View File

@ -6,57 +6,38 @@ In the general case, upgrading from etcd 3.1 to 3.2 can be a zero-downtime, roll
Before [starting an upgrade](#upgrade-procedure), read through the rest of this guide to prepare.
### Server upgrade checklists (breaking change)
### Upgrade checklists
3.2 now rejects domains names for `--listen-peer-urls` and `--listen-client-urls` (3.1 only prints out warnings), since domain name is invalid for network interface binding. Make sure that those URLs are properly formated as `scheme://IP:port`.
Highlighted breaking changes in 3.2.
See [issue #6336](https://github.com/coreos/etcd/issues/6336) for more contexts.
#### Change in gRPC dependency (>=3.2.10)
### Client upgrade checklists (>=3.2.0)
3.2.10 or later now requires [grpc/grpc-go](https://github.com/grpc/grpc-go/releases) `v1.7.5` (<=3.2.9 requires `v1.2.1`).
3.2 introduces two breaking changes.
##### Deprecate `grpclog.Logger`
Previously, `clientv3.Lease.TimeToLive` API returned `lease.ErrLeaseNotFound` on non-existent lease ID. 3.2 instead returns TTL=-1 in its response and no error (see [#7305](https://github.com/coreos/etcd/pull/7305)).
Before
```go
// when leaseID does not exist
resp, err := TimeToLive(ctx, leaseID)
resp == nil
err == lease.ErrLeaseNotFound
```
After
```go
// when leaseID does not exist
resp, err := TimeToLive(ctx, leaseID)
resp.TTL == -1
err == nil
```
`clientv3.NewFromConfigFile` is moved to `yaml.NewConfig`.
`grpclog.Logger` has been deprecated in favor of [`grpclog.LoggerV2`](https://github.com/grpc/grpc-go/blob/master/grpclog/loggerv2.go). `clientv3.Logger` is now `grpclog.LoggerV2`.
Before
```go
import "github.com/coreos/etcd/clientv3"
clientv3.NewFromConfigFile
clientv3.SetLogger(log.New(os.Stderr, "grpc: ", 0))
```
After
```go
import clientv3yaml "github.com/coreos/etcd/clientv3/yaml"
clientv3yaml.NewConfig
import "github.com/coreos/etcd/clientv3"
import "google.golang.org/grpc/grpclog"
clientv3.SetLogger(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
// log.New above cannot be used (not implement grpclog.LoggerV2 interface)
```
### Client upgrade checklists (>=3.2.10)
##### Deprecate `grpc.ErrClientConnTimeout`
Note that >=3.2.10 requires `grpc/grpc-go` v1.7.4 (<=3.2.9 with v1.2.1), which introduces some breaking changes.
Previously, `grpc.ErrClientConnTimeout` error is returned on client dial time-outs. >=3.2.10 instead returns `context.DeadlineExceeded` (see [#8504](https://github.com/coreos/etcd/issues/8504)).
Previously, `grpc.ErrClientConnTimeout` error is returned on client dial time-outs. 3.2 instead returns `context.DeadlineExceeded` (see [#8504](https://github.com/coreos/etcd/issues/8504)).
Before
@ -83,6 +64,148 @@ if err == context.DeadlineExceeded {
}
```
#### Change in maximum request size limits (>=3.2.10)
3.2.10 and 3.2.11 allow custom request size limits in server side. >=3.2.12 allows custom request size limits for both server and **client side**.
Server-side request limits can be configured with `--max-request-bytes` flag:
```bash
# limits request size to 1.5 KiB
etcd --max-request-bytes 1536
# client writes exceeding 1.5 KiB will be rejected
etcdctl put foo [LARGE VALUE...]
# etcdserver: request is too large
```
Or configure `embed.Config.MaxRequestBytes` field:
```go
import "github.com/coreos/etcd/embed"
import "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
// limit requests to 5 MiB
cfg := embed.NewConfig()
cfg.MaxRequestBytes = 5 * 1024 * 1024
// client writes exceeding 5 MiB will be rejected
_, err := cli.Put(ctx, "foo", [LARGE VALUE...])
err == rpctypes.ErrRequestTooLarge
```
**If not specified, server-side limit defaults to 1.5 MiB**.
Client-side request limits must be configured based on server-side limits.
```bash
# limits request size to 1 MiB
etcd --max-request-bytes 1048576
```
```go
import "github.com/coreos/etcd/clientv3"
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
MaxCallSendMsgSize: 2 * 1024 * 1024,
MaxCallRecvMsgSize: 3 * 1024 * 1024,
})
// client writes exceeding "--max-request-bytes" will be rejected from etcd server
_, err := cli.Put(ctx, "foo", strings.Repeat("a", 1*1024*1024+5))
err == rpctypes.ErrRequestTooLarge
// client writes exceeding "MaxCallSendMsgSize" will be rejected from client-side
_, err = cli.Put(ctx, "foo", strings.Repeat("a", 5*1024*1024))
err.Error() == "rpc error: code = ResourceExhausted desc = grpc: trying to send message larger than max (5242890 vs. 2097152)"
// some writes under limits
for i := range []int{0,1,2,3,4} {
_, err = cli.Put(ctx, fmt.Sprintf("foo%d", i), strings.Repeat("a", 1*1024*1024-500))
if err != nil {
panic(err)
}
}
// client reads exceeding "MaxCallRecvMsgSize" will be rejected from client-side
_, err = cli.Get(ctx, "foo", clientv3.WithPrefix())
err.Error() == "rpc error: code = ResourceExhausted desc = grpc: received message larger than max (5240509 vs. 3145728)"
```
**If not specified, client-side send limit defaults to 2 MiB (1.5 MiB + gRPC overhead bytes) and receive limit to `math.MaxInt32`**. Please see [clientv3 godoc](https://godoc.org/github.com/coreos/etcd/clientv3#Config) for more detail.
#### Change in raw gRPC client wrappers
3.2.12 or later changes the function signatures of `clientv3` gRPC client wrapper. This change was needed to support [custom `grpc.CallOption` on message size limits](https://github.com/coreos/etcd/pull/9047).
Before and after
```diff
-func NewKVFromKVClient(remote pb.KVClient) KV {
+func NewKVFromKVClient(remote pb.KVClient, c *Client) KV {
-func NewClusterFromClusterClient(remote pb.ClusterClient) Cluster {
+func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster {
-func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Duration) Lease {
+func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
-func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient) Maintenance {
+func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
-func NewWatchFromWatchClient(wc pb.WatchClient) Watcher {
+func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
```
#### Change in `--listen-peer-urls` and `--listen-client-urls`
3.2 now rejects domains names for `--listen-peer-urls` and `--listen-client-urls` (3.1 only prints out warnings), since domain name is invalid for network interface binding. Make sure that those URLs are properly formated as `scheme://IP:port`.
See [issue #6336](https://github.com/coreos/etcd/issues/6336) for more contexts.
#### Change in `clientv3.Lease.TimeToLive` API
Previously, `clientv3.Lease.TimeToLive` API returned `lease.ErrLeaseNotFound` on non-existent lease ID. 3.2 instead returns TTL=-1 in its response and no error (see [#7305](https://github.com/coreos/etcd/pull/7305)).
Before
```go
// when leaseID does not exist
resp, err := TimeToLive(ctx, leaseID)
resp == nil
err == lease.ErrLeaseNotFound
```
After
```go
// when leaseID does not exist
resp, err := TimeToLive(ctx, leaseID)
resp.TTL == -1
err == nil
```
#### Change in `clientv3.NewFromConfigFile`
`clientv3.NewFromConfigFile` is moved to `yaml.NewConfig`.
Before
```go
import "github.com/coreos/etcd/clientv3"
clientv3.NewFromConfigFile
```
After
```go
import clientv3yaml "github.com/coreos/etcd/clientv3/yaml"
clientv3yaml.NewConfig
```
### Server upgrade checklists
#### Upgrade requirements

View File

@ -111,23 +111,103 @@ curl -L http://localhost:2379/v3beta/kv/put \
Requests to `/v3alpha` endpoints will redirect to `/v3beta`, and `/v3alpha` will be removed in 3.4 release.
#### `gcr.io/etcd-development/etcd` as primary container registry
#### Change in maximum request size limits
etcd uses [`gcr.io/etcd-development/etcd`](https://gcr.io/etcd-development/etcd) as a primary container registry, and [`quay.io/coreos/etcd`](https://quay.io/coreos/etcd) as secondary.
3.3 now allows custom request size limits for both server and **client side**.
Before
Server-side request limits can be configured with `--max-request-bytes` flag:
```bash
docker pull quay.io/coreos/etcd:v3.2.5
# limits request size to 1.5 KiB
etcd --max-request-bytes 1536
# client writes exceeding 1.5 KiB will be rejected
etcdctl put foo [LARGE VALUE...]
# etcdserver: request is too large
```
After
Or configure `embed.Config.MaxRequestBytes` field:
```go
import "github.com/coreos/etcd/embed"
import "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
// limit requests to 5 MiB
cfg := embed.NewConfig()
cfg.MaxRequestBytes = 5 * 1024 * 1024
// client writes exceeding 5 MiB will be rejected
_, err := cli.Put(ctx, "foo", [LARGE VALUE...])
err == rpctypes.ErrRequestTooLarge
```
**If not specified, server-side limit defaults to 1.5 MiB**.
Client-side request limits must be configured based on server-side limits.
```bash
docker pull gcr.io/etcd-development/etcd:v3.3.0
# limits request size to 1 MiB
etcd --max-request-bytes 1048576
```
#### Change in `Snapshot` API error type
```go
import "github.com/coreos/etcd/clientv3"
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
MaxCallSendMsgSize: 2 * 1024 * 1024,
MaxCallRecvMsgSize: 3 * 1024 * 1024,
})
// client writes exceeding "--max-request-bytes" will be rejected from etcd server
_, err := cli.Put(ctx, "foo", strings.Repeat("a", 1*1024*1024+5))
err == rpctypes.ErrRequestTooLarge
// client writes exceeding "MaxCallSendMsgSize" will be rejected from client-side
_, err = cli.Put(ctx, "foo", strings.Repeat("a", 5*1024*1024))
err.Error() == "rpc error: code = ResourceExhausted desc = grpc: trying to send message larger than max (5242890 vs. 2097152)"
// some writes under limits
for i := range []int{0,1,2,3,4} {
_, err = cli.Put(ctx, fmt.Sprintf("foo%d", i), strings.Repeat("a", 1*1024*1024-500))
if err != nil {
panic(err)
}
}
// client reads exceeding "MaxCallRecvMsgSize" will be rejected from client-side
_, err = cli.Get(ctx, "foo", clientv3.WithPrefix())
err.Error() == "rpc error: code = ResourceExhausted desc = grpc: received message larger than max (5240509 vs. 3145728)"
```
**If not specified, client-side send limit defaults to 2 MiB (1.5 MiB + gRPC overhead bytes) and receive limit to `math.MaxInt32`**. Please see [clientv3 godoc](https://godoc.org/github.com/coreos/etcd/clientv3#Config) for more detail.
#### Change in raw gRPC client wrappers
3.3 changes the function signatures of `clientv3` gRPC client wrapper. This change was needed to support [custom `grpc.CallOption` on message size limits](https://github.com/coreos/etcd/pull/9047).
Before and after
```diff
-func NewKVFromKVClient(remote pb.KVClient) KV {
+func NewKVFromKVClient(remote pb.KVClient, c *Client) KV {
-func NewClusterFromClusterClient(remote pb.ClusterClient) Cluster {
+func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster {
-func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Duration) Lease {
+func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
-func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient) Maintenance {
+func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
-func NewWatchFromWatchClient(wc pb.WatchClient) Watcher {
+func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
```
#### Change in clientv3 `Snapshot` API error type
Previously, clientv3 `Snapshot` API returned raw [`grpc/*status.statusError`] type error. v3.3 now translates those errors to corresponding public error types, to be consistent with other APIs.
@ -173,7 +253,7 @@ _, err = io.Copy(f, rc)
err == context.DeadlineExceeded
```
#### Deprecate `golang.org/x/net/context` imports
#### Change in `golang.org/x/net/context` imports
`clientv3` has deprecated `golang.org/x/net/context`. If a project vendors `golang.org/x/net/context` in other code (e.g. etcd generated protocol buffer code) and imports `github.com/coreos/etcd/clientv3`, it requires Go 1.9+ to compile.
@ -191,9 +271,9 @@ import "context"
cli.Put(context.Background(), "f", "v")
```
#### Upgrade grpc/grpc-go to `v1.7.4`
#### Change in gRPC dependency
3.3 now requires [grpc/grpc-go](https://github.com/grpc/grpc-go/releases) `v1.7.4`.
3.3 now requires [grpc/grpc-go](https://github.com/grpc/grpc-go/releases) `v1.7.5`.
##### Deprecate `grpclog.Logger`
@ -245,6 +325,22 @@ if err == context.DeadlineExceeded {
}
```
#### Change in official container registry
etcd now uses [`gcr.io/etcd-development/etcd`](https://gcr.io/etcd-development/etcd) as a primary container registry, and [`quay.io/coreos/etcd`](https://quay.io/coreos/etcd) as secondary.
Before
```bash
docker pull quay.io/coreos/etcd:v3.2.5
```
After
```bash
docker pull gcr.io/etcd-development/etcd:v3.3.0
```
### Server upgrade checklists
#### Upgrade requirements

View File

@ -101,60 +101,65 @@ type Auth interface {
}
type auth struct {
remote pb.AuthClient
remote pb.AuthClient
callOpts []grpc.CallOption
}
func NewAuth(c *Client) Auth {
return &auth{remote: RetryAuthClient(c)}
api := &auth{remote: RetryAuthClient(c)}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{})
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...)
return (*AuthEnableResponse)(resp), toErr(ctx, err)
}
func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{})
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, auth.callOpts...)
return (*AuthDisableResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password})
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthUserAddResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name})
resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, auth.callOpts...)
return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password})
resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role})
resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, auth.callOpts...)
return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name})
resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, auth.callOpts...)
return (*AuthUserGetResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{})
resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, auth.callOpts...)
return (*AuthUserListResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role})
resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, auth.callOpts...)
return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name})
resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, auth.callOpts...)
return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
}
@ -164,27 +169,27 @@ func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, ran
RangeEnd: []byte(rangeEnd),
PermType: authpb.Permission_Type(permType),
}
resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm})
resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, auth.callOpts...)
return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role})
resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, auth.callOpts...)
return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{})
resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, auth.callOpts...)
return (*AuthRoleListResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd})
resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, auth.callOpts...)
return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role})
resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, auth.callOpts...)
return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
}
@ -197,12 +202,13 @@ func StrToPermissionType(s string) (PermissionType, error) {
}
type authenticator struct {
conn *grpc.ClientConn // conn in-use
remote pb.AuthClient
conn *grpc.ClientConn // conn in-use
remote pb.AuthClient
callOpts []grpc.CallOption
}
func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password})
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthenticateResponse)(resp), toErr(ctx, err)
}
@ -210,14 +216,18 @@ func (auth *authenticator) close() {
auth.conn.Close()
}
func newAuthenticator(endpoint string, opts []grpc.DialOption) (*authenticator, error) {
func newAuthenticator(endpoint string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
conn, err := grpc.Dial(endpoint, opts...)
if err != nil {
return nil, err
}
return &authenticator{
api := &authenticator{
conn: conn,
remote: pb.NewAuthClient(conn),
}, nil
}
if c != nil {
api.callOpts = c.callOpts
}
return api, nil
}

View File

@ -67,6 +67,8 @@ type Client struct {
Password string
// tokenCred is an instance of WithPerRPCCredentials()'s argument
tokenCred *authTokenCredential
callOpts []grpc.CallOption
}
// New creates a new etcdv3 client from a given configuration.
@ -295,7 +297,7 @@ func (c *Client) getToken(ctx context.Context) error {
endpoint := c.cfg.Endpoints[i]
host := getHost(endpoint)
// use dial options without dopts to avoid reusing the client balancer
auth, err = newAuthenticator(host, c.dialSetupOpts(endpoint))
auth, err = newAuthenticator(host, c.dialSetupOpts(endpoint), c)
if err != nil {
continue
}
@ -386,11 +388,29 @@ func newClient(cfg *Config) (*Client, error) {
ctx: ctx,
cancel: cancel,
mu: new(sync.Mutex),
callOpts: defaultCallOpts,
}
if cfg.Username != "" && cfg.Password != "" {
client.Username = cfg.Username
client.Password = cfg.Password
}
if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 {
if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize {
return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize)
}
callOpts := []grpc.CallOption{
defaultFailFast,
defaultMaxCallSendMsgSize,
defaultMaxCallRecvMsgSize,
}
if cfg.MaxCallSendMsgSize > 0 {
callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize)
}
if cfg.MaxCallRecvMsgSize > 0 {
callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize)
}
client.callOpts = callOpts
}
client.balancer = newHealthBalancer(cfg.Endpoints, cfg.DialTimeout, func(ep string) (bool, error) {
return grpcHealthCheck(client, ep)

View File

@ -18,6 +18,8 @@ import (
"context"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
type (
@ -43,20 +45,29 @@ type Cluster interface {
}
type cluster struct {
remote pb.ClusterClient
remote pb.ClusterClient
callOpts []grpc.CallOption
}
func NewCluster(c *Client) Cluster {
return &cluster{remote: RetryClusterClient(c)}
api := &cluster{remote: RetryClusterClient(c)}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
func NewClusterFromClusterClient(remote pb.ClusterClient) Cluster {
return &cluster{remote: remote}
func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster {
api := &cluster{remote: remote}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
resp, err := c.remote.MemberAdd(ctx, r)
resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
@ -65,7 +76,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd
func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
r := &pb.MemberRemoveRequest{ID: id}
resp, err := c.remote.MemberRemove(ctx, r)
resp, err := c.remote.MemberRemove(ctx, r, c.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
@ -75,7 +86,7 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes
func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
// it is safe to retry on update.
r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
resp, err := c.remote.MemberUpdate(ctx, r)
resp, err := c.remote.MemberUpdate(ctx, r, c.callOpts...)
if err == nil {
return (*MemberUpdateResponse)(resp), nil
}
@ -84,7 +95,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
// it is safe to retry on list.
resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{})
resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, c.callOpts...)
if err == nil {
return (*MemberListResponse)(resp), nil
}

View File

@ -41,6 +41,19 @@ type Config struct {
// keep-alive probe. If the response is not received in this time, the connection is closed.
DialKeepAliveTimeout time.Duration `json:"dial-keep-alive-timeout"`
// MaxCallSendMsgSize is the client-side request send limit in bytes.
// If 0, it defaults to 2.0 MiB (2 * 1024 * 1024).
// Make sure that "MaxCallSendMsgSize" < server-side default send/recv limit.
// ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes").
MaxCallSendMsgSize int
// MaxCallRecvMsgSize is the client-side response receive limit.
// If 0, it defaults to "math.MaxInt32", because range response can
// easily exceed request send limits.
// Make sure that "MaxCallRecvMsgSize" >= server-side default send/recv limit.
// ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes").
MaxCallRecvMsgSize int
// TLS holds the client secure credentials, if any.
TLS *tls.Config

46
clientv3/grpc_options.go Normal file
View File

@ -0,0 +1,46 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"math"
"google.golang.org/grpc"
)
var (
// Disable gRPC internal retrial logic
// TODO: enable when gRPC retry is stable (FailFast=false)
// Reference:
// - https://github.com/grpc/grpc-go/issues/1532
// - https://github.com/grpc/proposal/blob/master/A6-client-retries.md
defaultFailFast = grpc.FailFast(true)
// client-side request send limit, gRPC default is math.MaxInt32
// Make sure that "client-side send limit < server-side default send/recv limit"
// Same value as "embed.DefaultMaxRequestBytes" plus gRPC overhead bytes
defaultMaxCallSendMsgSize = grpc.MaxCallSendMsgSize(2 * 1024 * 1024)
// client-side response receive limit, gRPC default is 4MB
// Make sure that "client-side receive limit >= server-side default send/recv limit"
// because range response can easily exceed request send limits
// Default to math.MaxInt32; writes exceeding server-side send limit fails anyway
defaultMaxCallRecvMsgSize = grpc.MaxCallRecvMsgSize(math.MaxInt32)
)
// defaultCallOpts defines a list of default "gRPC.CallOption".
// Some options are exposed to "clientv3.Config".
// Defaults will be overridden by the settings in "clientv3.Config".
var defaultCallOpts = []grpc.CallOption{defaultFailFast, defaultMaxCallSendMsgSize, defaultMaxCallRecvMsgSize}

View File

@ -182,7 +182,7 @@ func TestDialForeignEndpoint(t *testing.T) {
// grpc can return a lazy connection that's not connected yet; confirm
// that it can communicate with the cluster.
kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn))
kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), clus.Client(0))
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
if _, gerr := kvc.Get(ctx, "abc"); gerr != nil {

View File

@ -30,6 +30,7 @@ import (
"github.com/coreos/etcd/pkg/testutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
func TestKVPutError(t *testing.T) {
@ -39,7 +40,7 @@ func TestKVPutError(t *testing.T) {
maxReqBytes = 1.5 * 1024 * 1024 // hard coded max in v3_server.go
quota = int64(int(maxReqBytes) + 8*os.Getpagesize())
)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, QuotaBackendBytes: quota})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, QuotaBackendBytes: quota, ClientMaxCallSendMsgSize: 100 * 1024 * 1024})
defer clus.Terminate(t)
kv := clus.RandClient()
@ -861,3 +862,95 @@ func TestKVPutAtMostOnce(t *testing.T) {
t.Fatalf("expected version <= 10, got %+v", resp.Kvs[0])
}
}
// TestKVLargeRequests tests various client/server side request limits.
func TestKVLargeRequests(t *testing.T) {
defer testutil.AfterTest(t)
tests := []struct {
// make sure that "MaxCallSendMsgSize" < server-side default send/recv limit
maxRequestBytesServer uint
maxCallSendBytesClient int
maxCallRecvBytesClient int
valueSize int
expectError error
}{
{
maxRequestBytesServer: 1,
maxCallSendBytesClient: 0,
maxCallRecvBytesClient: 0,
valueSize: 1024,
expectError: rpctypes.ErrRequestTooLarge,
},
// without proper client-side receive size limit
// "code = ResourceExhausted desc = grpc: received message larger than max (5242929 vs. 4194304)"
{
maxRequestBytesServer: 7*1024*1024 + 512*1024,
maxCallSendBytesClient: 7 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 5 * 1024 * 1024,
expectError: nil,
},
{
maxRequestBytesServer: 10 * 1024 * 1024,
maxCallSendBytesClient: 100 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10 * 1024 * 1024,
expectError: rpctypes.ErrRequestTooLarge,
},
{
maxRequestBytesServer: 10 * 1024 * 1024,
maxCallSendBytesClient: 10 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10 * 1024 * 1024,
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", 10485770, 10485760),
},
{
maxRequestBytesServer: 10 * 1024 * 1024,
maxCallSendBytesClient: 100 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10*1024*1024 + 5,
expectError: rpctypes.ErrRequestTooLarge,
},
{
maxRequestBytesServer: 10 * 1024 * 1024,
maxCallSendBytesClient: 10 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10*1024*1024 + 5,
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", 10485775, 10485760),
},
}
for i, test := range tests {
clus := integration.NewClusterV3(t,
&integration.ClusterConfig{
Size: 1,
MaxRequestBytes: test.maxRequestBytesServer,
ClientMaxCallSendMsgSize: test.maxCallSendBytesClient,
ClientMaxCallRecvMsgSize: test.maxCallRecvBytesClient,
},
)
cli := clus.Client(0)
_, err := cli.Put(context.TODO(), "foo", strings.Repeat("a", test.valueSize))
if _, ok := err.(rpctypes.EtcdError); ok {
if err != test.expectError {
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
}
} else if err != nil && err.Error() != test.expectError.Error() {
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
}
// put request went through, now expects large response back
if err == nil {
_, err = cli.Get(context.TODO(), "foo")
if err != nil {
t.Errorf("#%d: get expected no error, got %v", i, err)
}
}
clus.Terminate(t)
}
}

View File

@ -18,6 +18,8 @@ import (
"context"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
type (
@ -88,15 +90,24 @@ func (resp *TxnResponse) OpResponse() OpResponse {
}
type kv struct {
remote pb.KVClient
remote pb.KVClient
callOpts []grpc.CallOption
}
func NewKV(c *Client) KV {
return &kv{remote: RetryKVClient(c)}
api := &kv{remote: RetryKVClient(c)}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
func NewKVFromKVClient(remote pb.KVClient) KV {
return &kv{remote: remote}
func NewKVFromKVClient(remote pb.KVClient, c *Client) KV {
api := &kv{remote: remote}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
@ -115,7 +126,7 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete
}
func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) {
resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest())
resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
@ -124,8 +135,9 @@ func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*C
func (kv *kv) Txn(ctx context.Context) Txn {
return &txn{
kv: kv,
ctx: ctx,
kv: kv,
ctx: ctx,
callOpts: kv.callOpts,
}
}
@ -134,27 +146,27 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
switch op.t {
case tRange:
var resp *pb.RangeResponse
resp, err = kv.remote.Range(ctx, op.toRangeRequest())
resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil
}
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
resp, err = kv.remote.Put(ctx, r)
resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
case tDeleteRange:
var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
resp, err = kv.remote.DeleteRange(ctx, r)
resp, err = kv.remote.DeleteRange(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil
}
case tTxn:
var resp *pb.TxnResponse
resp, err = kv.remote.Txn(ctx, op.toTxnRequest())
resp, err = kv.remote.Txn(ctx, op.toTxnRequest(), kv.callOpts...)
if err == nil {
return OpResponse{txn: (*TxnResponse)(resp)}, nil
}

View File

@ -22,6 +22,7 @@ import (
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
@ -166,6 +167,8 @@ type lessor struct {
// firstKeepAliveOnce ensures stream starts after first KeepAlive call.
firstKeepAliveOnce sync.Once
callOpts []grpc.CallOption
}
// keepAlive multiplexes a keepalive for a lease over multiple channels
@ -181,10 +184,10 @@ type keepAlive struct {
}
func NewLease(c *Client) Lease {
return NewLeaseFromLeaseClient(RetryLeaseClient(c), c.cfg.DialTimeout+time.Second)
return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second)
}
func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Duration) Lease {
func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
l := &lessor{
donec: make(chan struct{}),
keepAlives: make(map[LeaseID]*keepAlive),
@ -194,6 +197,9 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati
if l.firstKeepAliveTimeout == time.Second {
l.firstKeepAliveTimeout = defaultTTL
}
if c != nil {
l.callOpts = c.callOpts
}
reqLeaderCtx := WithRequireLeader(context.Background())
l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
return l
@ -201,7 +207,7 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati
func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
r := &pb.LeaseGrantRequest{TTL: ttl}
resp, err := l.remote.LeaseGrant(ctx, r)
resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
if err == nil {
gresp := &LeaseGrantResponse{
ResponseHeader: resp.GetHeader(),
@ -216,7 +222,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
r := &pb.LeaseRevokeRequest{ID: int64(id)}
resp, err := l.remote.LeaseRevoke(ctx, r)
resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
if err == nil {
return (*LeaseRevokeResponse)(resp), nil
}
@ -225,7 +231,7 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
r := toLeaseTimeToLiveRequest(id, opts...)
resp, err := l.remote.LeaseTimeToLive(ctx, r)
resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
if err == nil {
gresp := &LeaseTimeToLiveResponse{
ResponseHeader: resp.GetHeader(),
@ -240,7 +246,7 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
}
func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{})
resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...)
if err == nil {
leases := make([]LeaseStatus, len(resp.Leases))
for i := range resp.Leases {
@ -389,7 +395,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
cctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := l.remote.LeaseKeepAlive(cctx)
stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
@ -460,7 +466,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
// resetRecv opens a new lease stream and starts sending keep alive requests.
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
sctx, cancel := context.WithCancel(l.stopCtx)
stream, err := l.remote.LeaseKeepAlive(sctx)
stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
if err != nil {
cancel()
return nil, err

View File

@ -19,6 +19,8 @@ import (
"io"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
type (
@ -63,12 +65,13 @@ type Maintenance interface {
}
type maintenance struct {
dial func(endpoint string) (pb.MaintenanceClient, func(), error)
remote pb.MaintenanceClient
dial func(endpoint string) (pb.MaintenanceClient, func(), error)
remote pb.MaintenanceClient
callOpts []grpc.CallOption
}
func NewMaintenance(c *Client) Maintenance {
return &maintenance{
api := &maintenance{
dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
conn, err := c.dial(endpoint)
if err != nil {
@ -79,15 +82,23 @@ func NewMaintenance(c *Client) Maintenance {
},
remote: RetryMaintenanceClient(c, c.conn),
}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient) Maintenance {
return &maintenance{
func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
api := &maintenance{
dial: func(string) (pb.MaintenanceClient, func(), error) {
return remote, func() {}, nil
},
remote: remote,
}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
@ -96,7 +107,7 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
MemberID: 0, // all
Alarm: pb.AlarmType_NONE, // all
}
resp, err := m.remote.Alarm(ctx, req)
resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
if err == nil {
return (*AlarmResponse)(resp), nil
}
@ -126,7 +137,7 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
return &ret, nil
}
resp, err := m.remote.Alarm(ctx, req)
resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
if err == nil {
return (*AlarmResponse)(resp), nil
}
@ -139,7 +150,7 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm
return nil, toErr(ctx, err)
}
defer cancel()
resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{})
resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
@ -152,7 +163,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo
return nil, toErr(ctx, err)
}
defer cancel()
resp, err := remote.Status(ctx, &pb.StatusRequest{})
resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
@ -165,7 +176,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*
return nil, toErr(ctx, err)
}
defer cancel()
resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev})
resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, m.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
@ -173,7 +184,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*
}
func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{})
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, m.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
@ -210,6 +221,6 @@ func (rc *snapshotReadCloser) Read(p []byte) (n int, err error) {
}
func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID})
resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, m.callOpts...)
return (*MoveLeaderResponse)(resp), toErr(ctx, err)
}

View File

@ -204,7 +204,7 @@ var rangeTests = []struct {
func TestKvOrdering(t *testing.T) {
for i, tt := range rangeTests {
mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.OpResponse()}
mKV := &mockKV{clientv3.NewKVFromKVClient(nil, nil), tt.response.OpResponse()}
kv := &kvOrdering{
mKV,
func(r *clientv3.GetResponse) OrderViolationFunc {
@ -258,7 +258,7 @@ var txnTests = []struct {
func TestTxnOrdering(t *testing.T) {
for i, tt := range txnTests {
mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.OpResponse()}
mKV := &mockKV{clientv3.NewKVFromKVClient(nil, nil), tt.response.OpResponse()}
kv := &kvOrdering{
mKV,
func(r *clientv3.TxnResponse) OrderViolationFunc {

View File

@ -19,6 +19,8 @@ import (
"sync"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
// Txn is the interface that wraps mini-transactions.
@ -65,6 +67,8 @@ type txn struct {
sus []*pb.RequestOp
fas []*pb.RequestOp
callOpts []grpc.CallOption
}
func (txn *txn) If(cs ...Cmp) Txn {
@ -139,7 +143,7 @@ func (txn *txn) Commit() (*TxnResponse, error) {
var resp *pb.TxnResponse
var err error
resp, err = txn.kv.remote.Txn(txn.ctx, r)
resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
if err != nil {
return nil, toErr(txn.ctx, err)
}

View File

@ -106,7 +106,8 @@ func (wr *WatchResponse) IsProgressNotify() bool {
// watcher implements the Watcher interface
type watcher struct {
remote pb.WatchClient
remote pb.WatchClient
callOpts []grpc.CallOption
// mu protects the grpc streams map
mu sync.RWMutex
@ -117,8 +118,9 @@ type watcher struct {
// watchGrpcStream tracks all watch resources attached to a single grpc stream.
type watchGrpcStream struct {
owner *watcher
remote pb.WatchClient
owner *watcher
remote pb.WatchClient
callOpts []grpc.CallOption
// ctx controls internal remote.Watch requests
ctx context.Context
@ -189,14 +191,18 @@ type watcherStream struct {
}
func NewWatcher(c *Client) Watcher {
return NewWatchFromWatchClient(pb.NewWatchClient(c.conn))
return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
}
func NewWatchFromWatchClient(wc pb.WatchClient) Watcher {
return &watcher{
func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
w := &watcher{
remote: wc,
streams: make(map[string]*watchGrpcStream),
}
if c != nil {
w.callOpts = c.callOpts
}
return w
}
// never closes
@ -215,6 +221,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
wgs := &watchGrpcStream{
owner: w,
remote: w.remote,
callOpts: w.callOpts,
ctx: ctx,
ctxKey: streamKeyFromCtx(inctx),
cancel: cancel,
@ -775,7 +782,7 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
return nil, err
default:
}
if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
break
}
if isHaltErr(w.ctx, err) {

View File

@ -31,19 +31,19 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
c := clientv3.NewCtxClient(context.Background())
kvc := adapter.KvServerToKvClient(v3rpc.NewQuotaKVServer(s))
c.KV = clientv3.NewKVFromKVClient(kvc)
c.KV = clientv3.NewKVFromKVClient(kvc, c)
lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s))
c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second)
c.Lease = clientv3.NewLeaseFromLeaseClient(lc, c, time.Second)
wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc)}
c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)}
mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s))
c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc)
c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c)
clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s))
c.Cluster = clientv3.NewClusterFromClusterClient(clc)
c.Cluster = clientv3.NewClusterFromClusterClient(clc, c)
// TODO: implement clientv3.Auth interface?

View File

@ -105,6 +105,9 @@ type ClusterConfig struct {
GRPCKeepAliveTimeout time.Duration
// SkipCreatingClient to skip creating clients for each member.
SkipCreatingClient bool
ClientMaxCallSendMsgSize int
ClientMaxCallRecvMsgSize int
}
type cluster struct {
@ -232,15 +235,17 @@ func (c *cluster) HTTPMembers() []client.Member {
func (c *cluster) mustNewMember(t *testing.T) *member {
m := mustNewMember(t,
memberConfig{
name: c.name(rand.Int()),
peerTLS: c.cfg.PeerTLS,
clientTLS: c.cfg.ClientTLS,
quotaBackendBytes: c.cfg.QuotaBackendBytes,
maxTxnOps: c.cfg.MaxTxnOps,
maxRequestBytes: c.cfg.MaxRequestBytes,
grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
name: c.name(rand.Int()),
peerTLS: c.cfg.PeerTLS,
clientTLS: c.cfg.ClientTLS,
quotaBackendBytes: c.cfg.QuotaBackendBytes,
maxTxnOps: c.cfg.MaxTxnOps,
maxRequestBytes: c.cfg.MaxRequestBytes,
grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
})
m.DiscoveryURL = c.cfg.DiscoveryURL
if c.cfg.UseGRPC {
@ -382,7 +387,7 @@ func (c *cluster) waitLeader(t *testing.T, membs []*member) int {
// ensure leader is up via linearizable get
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration)
ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration+time.Second)
_, err := kapi.Get(ctx, "0", &client.GetOptions{Quorum: true})
cancel()
if err == nil || strings.Contains(err.Error(), "Key not found") {
@ -501,21 +506,25 @@ type member struct {
// serverClient is a clientv3 that directly calls the etcdserver.
serverClient *clientv3.Client
keepDataDirTerminate bool
keepDataDirTerminate bool
clientMaxCallSendMsgSize int
clientMaxCallRecvMsgSize int
}
func (m *member) GRPCAddr() string { return m.grpcAddr }
type memberConfig struct {
name string
peerTLS *transport.TLSInfo
clientTLS *transport.TLSInfo
quotaBackendBytes int64
maxTxnOps uint
maxRequestBytes uint
grpcKeepAliveMinTime time.Duration
grpcKeepAliveInterval time.Duration
grpcKeepAliveTimeout time.Duration
name string
peerTLS *transport.TLSInfo
clientTLS *transport.TLSInfo
quotaBackendBytes int64
maxTxnOps uint
maxRequestBytes uint
grpcKeepAliveMinTime time.Duration
grpcKeepAliveInterval time.Duration
grpcKeepAliveTimeout time.Duration
clientMaxCallSendMsgSize int
clientMaxCallRecvMsgSize int
}
// mustNewMember return an inited member with the given name. If peerTLS is
@ -587,6 +596,8 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member {
Timeout: mcfg.grpcKeepAliveTimeout,
}))
}
m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize
m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
m.InitialCorruptCheck = true
@ -630,8 +641,10 @@ func NewClientV3(m *member) (*clientv3.Client, error) {
}
cfg := clientv3.Config{
Endpoints: []string{m.grpcAddr},
DialTimeout: 5 * time.Second,
Endpoints: []string{m.grpcAddr},
DialTimeout: 5 * time.Second,
MaxCallSendMsgSize: m.clientMaxCallSendMsgSize,
MaxCallRecvMsgSize: m.clientMaxCallRecvMsgSize,
}
if m.ClientTLSInfo != nil {

View File

@ -99,12 +99,12 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
return nil, err
}
rpc := toGRPC(c)
c.KV = clientv3.NewKVFromKVClient(rpc.KV)
c.KV = clientv3.NewKVFromKVClient(rpc.KV, c)
pmu.Lock()
lc := c.Lease
c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout)
c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, c, cfg.DialTimeout)
c.Watcher = &proxyCloser{
Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch),
Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch, c),
wdonec: proxies[c].wdonec,
kvdonec: proxies[c].kvdonec,
lclose: func() { lc.Close() },

View File

@ -1874,7 +1874,7 @@ func TestV3LargeRequests(t *testing.T) {
// limit receive call size with original value + gRPC overhead bytes
_, err = kvcli.Range(context.TODO(), reqget, grpc.MaxCallRecvMsgSize(test.valueSize+512*1024))
if err != nil {
t.Errorf("#%d: range expected no error , got %v", i, err)
t.Errorf("#%d: range expected no error, got %v", i, err)
}
}