mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #5217 from gyuho/rpc_types
*: return rpctypes.Err in clientv3
This commit is contained in:
commit
4480eb6d49
@ -235,9 +235,9 @@ func dialEndpointList(c *Client) (*grpc.ClientConn, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// isHalted returns true if the given error and context indicate no forward
|
||||
// isHaltErr returns true if the given error and context indicate no forward
|
||||
// progress can be made, even after reconnecting.
|
||||
func isHalted(ctx context.Context, err error) bool {
|
||||
func isHaltErr(ctx context.Context, err error) bool {
|
||||
isRPCError := strings.HasPrefix(grpc.ErrorDesc(err), "etcdserver: ")
|
||||
return isRPCError || ctx.Err() != nil
|
||||
}
|
||||
|
@ -55,16 +55,16 @@ func TestDialTimeout(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsHalted(t *testing.T) {
|
||||
if !isHalted(nil, fmt.Errorf("etcdserver: some etcdserver error")) {
|
||||
func TestIsHaltErr(t *testing.T) {
|
||||
if !isHaltErr(nil, fmt.Errorf("etcdserver: some etcdserver error")) {
|
||||
t.Errorf(`error prefixed with "etcdserver: " should be Halted`)
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
if isHalted(ctx, nil) {
|
||||
if isHaltErr(ctx, nil) {
|
||||
t.Errorf("no error and active context should not be Halted")
|
||||
}
|
||||
cancel()
|
||||
if !isHalted(ctx, nil) {
|
||||
if !isHaltErr(ctx, nil) {
|
||||
t.Errorf("cancel on context should be Halted")
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ package clientv3
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
@ -70,12 +71,12 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd
|
||||
return (*MemberAddResponse)(resp), nil
|
||||
}
|
||||
|
||||
if isHalted(ctx, err) {
|
||||
return nil, err
|
||||
if isHaltErr(ctx, err) {
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
go c.switchRemote(err)
|
||||
return nil, err
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
|
||||
@ -85,12 +86,12 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes
|
||||
return (*MemberRemoveResponse)(resp), nil
|
||||
}
|
||||
|
||||
if isHalted(ctx, err) {
|
||||
return nil, err
|
||||
if isHaltErr(ctx, err) {
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
go c.switchRemote(err)
|
||||
return nil, err
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
|
||||
@ -102,13 +103,13 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
|
||||
return (*MemberUpdateResponse)(resp), nil
|
||||
}
|
||||
|
||||
if isHalted(ctx, err) {
|
||||
return nil, err
|
||||
if isHaltErr(ctx, err) {
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
err = c.switchRemote(err)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -121,13 +122,13 @@ func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
|
||||
return (*MemberListResponse)(resp), nil
|
||||
}
|
||||
|
||||
if isHalted(ctx, err) {
|
||||
return nil, err
|
||||
if isHaltErr(ctx, err) {
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
err = c.switchRemote(err)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
44
clientv3/integration/auth_test.go
Normal file
44
clientv3/integration/auth_test.go
Normal file
@ -0,0 +1,44 @@
|
||||
// Copyright 2016 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"github.com/coreos/etcd/integration"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestAuthError(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
authapi := clientv3.NewAuth(clus.RandClient())
|
||||
|
||||
_, err := authapi.UserAdd(context.TODO(), "foo", "bar")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = authapi.Authenticate(context.TODO(), "foo", "bar111")
|
||||
if err != rpctypes.ErrAuthFailed {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrAuthFailed, err)
|
||||
}
|
||||
}
|
@ -17,6 +17,7 @@ package integration
|
||||
import (
|
||||
"bytes"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -28,6 +29,42 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestKVPutError(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
var (
|
||||
maxReqBytes = 1.5 * 1024 * 1024
|
||||
quota = int64(maxReqBytes * 1.2)
|
||||
)
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, QuotaBackendBytes: quota})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
kv := clientv3.NewKV(clus.RandClient())
|
||||
ctx := context.TODO()
|
||||
|
||||
_, err := kv.Put(ctx, "", "bar")
|
||||
if err != rpctypes.ErrEmptyKey {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrEmptyKey, err)
|
||||
}
|
||||
|
||||
_, err = kv.Put(ctx, "key", strings.Repeat("a", int(maxReqBytes+100))) // 1.5MB
|
||||
if err != rpctypes.ErrRequestTooLarge {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrRequestTooLarge, err)
|
||||
}
|
||||
|
||||
_, err = kv.Put(ctx, "foo1", strings.Repeat("a", int(maxReqBytes-50)))
|
||||
if err != nil { // below quota
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
time.Sleep(500 * time.Millisecond) // give enough time for commit
|
||||
|
||||
_, err = kv.Put(ctx, "foo2", strings.Repeat("a", int(maxReqBytes-50)))
|
||||
if err != rpctypes.ErrNoSpace { // over quota
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrNoSpace, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVPut(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
@ -323,6 +360,36 @@ func TestKVDelete(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVCompactError(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
kv := clientv3.NewKV(clus.RandClient())
|
||||
ctx := context.TODO()
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
if _, err := kv.Put(ctx, "foo", "bar"); err != nil {
|
||||
t.Fatalf("couldn't put 'foo' (%v)", err)
|
||||
}
|
||||
}
|
||||
err := kv.Compact(ctx, 6)
|
||||
if err != nil {
|
||||
t.Fatalf("couldn't compact 6 (%v)", err)
|
||||
}
|
||||
|
||||
err = kv.Compact(ctx, 6)
|
||||
if err != rpctypes.ErrCompacted {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrCompacted, err)
|
||||
}
|
||||
|
||||
err = kv.Compact(ctx, 100)
|
||||
if err != rpctypes.ErrFutureRev {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrFutureRev, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVCompact(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
|
@ -27,6 +27,23 @@ import (
|
||||
"google.golang.org/grpc/codes"
|
||||
)
|
||||
|
||||
func TestLeastNotFoundError(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
lapi := clientv3.NewLease(clus.RandClient())
|
||||
defer lapi.Close()
|
||||
|
||||
kv := clientv3.NewKV(clus.RandClient())
|
||||
|
||||
_, err := kv.Put(context.TODO(), "foo", "bar", clientv3.WithLease(clientv3.LeaseID(500)))
|
||||
if err != rpctypes.ErrLeaseNotFound {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrLeaseNotFound, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeaseGrant(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
|
44
clientv3/integration/role_test.go
Normal file
44
clientv3/integration/role_test.go
Normal file
@ -0,0 +1,44 @@
|
||||
// Copyright 2016 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"github.com/coreos/etcd/integration"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestRoleError(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
authapi := clientv3.NewAuth(clus.RandClient())
|
||||
|
||||
_, err := authapi.RoleAdd(context.TODO(), "test-role")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = authapi.RoleAdd(context.TODO(), "test-role")
|
||||
if err != rpctypes.ErrRoleAlreadyExist {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrRoleAlreadyExist, err)
|
||||
}
|
||||
}
|
@ -15,15 +15,42 @@
|
||||
package integration
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"github.com/coreos/etcd/integration"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestTxnError(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
kv := clientv3.NewKV(clus.RandClient())
|
||||
ctx := context.TODO()
|
||||
|
||||
_, err := kv.Txn(ctx).Then(clientv3.OpPut("foo", "bar1"), clientv3.OpPut("foo", "bar2")).Commit()
|
||||
if err != rpctypes.ErrDuplicateKey {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrDuplicateKey, err)
|
||||
}
|
||||
|
||||
ops := make([]clientv3.Op, v3rpc.MaxOpsPerTxn+10)
|
||||
for i := range ops {
|
||||
ops[i] = clientv3.OpPut(fmt.Sprintf("foo%d", i), "")
|
||||
}
|
||||
_, err = kv.Txn(ctx).Then(ops...).Commit()
|
||||
if err != rpctypes.ErrTooManyOps {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrTooManyOps, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTxnWriteFail(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
|
54
clientv3/integration/user_test.go
Normal file
54
clientv3/integration/user_test.go
Normal file
@ -0,0 +1,54 @@
|
||||
// Copyright 2016 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"github.com/coreos/etcd/integration"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestUserError(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
authapi := clientv3.NewAuth(clus.RandClient())
|
||||
|
||||
_, err := authapi.UserAdd(context.TODO(), "foo", "bar")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = authapi.UserAdd(context.TODO(), "foo", "bar")
|
||||
if err != rpctypes.ErrUserAlreadyExist {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrUserAlreadyExist, err)
|
||||
}
|
||||
|
||||
_, err = authapi.UserDelete(context.TODO(), "not-exist-user")
|
||||
if err != rpctypes.ErrUserNotFound {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrUserNotFound, err)
|
||||
}
|
||||
|
||||
_, err = authapi.UserGrant(context.TODO(), "foo", "test-role-does-not-exist")
|
||||
if err != rpctypes.ErrRoleNotFound {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrRoleNotFound, err)
|
||||
}
|
||||
}
|
@ -17,6 +17,7 @@ package clientv3
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
@ -96,17 +97,17 @@ func NewKV(c *Client) KV {
|
||||
|
||||
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
|
||||
r, err := kv.Do(ctx, OpPut(key, val, opts...))
|
||||
return r.put, err
|
||||
return r.put, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) {
|
||||
r, err := kv.Do(ctx, OpGet(key, opts...))
|
||||
return r.get, err
|
||||
return r.get, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) {
|
||||
r, err := kv.Do(ctx, OpDelete(key, opts...))
|
||||
return r.del, err
|
||||
return r.del, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
func (kv *kv) Compact(ctx context.Context, rev int64) error {
|
||||
@ -116,12 +117,12 @@ func (kv *kv) Compact(ctx context.Context, rev int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if isHalted(ctx, err) {
|
||||
return err
|
||||
if isHaltErr(ctx, err) {
|
||||
return rpctypes.Error(err)
|
||||
}
|
||||
|
||||
go kv.switchRemote(err)
|
||||
return err
|
||||
return rpctypes.Error(err)
|
||||
}
|
||||
|
||||
func (kv *kv) Txn(ctx context.Context) Txn {
|
||||
@ -166,14 +167,14 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
|
||||
panic("Unknown op")
|
||||
}
|
||||
|
||||
if isHalted(ctx, err) {
|
||||
return OpResponse{}, err
|
||||
if isHaltErr(ctx, err) {
|
||||
return OpResponse{}, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
// do not retry on modifications
|
||||
if op.isWrite() {
|
||||
go kv.switchRemote(err)
|
||||
return OpResponse{}, err
|
||||
return OpResponse{}, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
if nerr := kv.switchRemote(err); nerr != nil {
|
||||
@ -192,7 +193,7 @@ func (kv *kv) switchRemote(prevErr error) error {
|
||||
|
||||
newConn, err := kv.c.retryConnection(kv.conn, prevErr)
|
||||
if err != nil {
|
||||
return err
|
||||
return rpctypes.Error(err)
|
||||
}
|
||||
|
||||
kv.conn = newConn
|
||||
|
@ -134,9 +134,10 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
|
||||
}
|
||||
return gresp, nil
|
||||
}
|
||||
if isHalted(cctx, err) {
|
||||
return nil, err
|
||||
if isHaltErr(cctx, err) {
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
if nerr := l.switchRemoteAndStream(err); nerr != nil {
|
||||
return nil, nerr
|
||||
}
|
||||
@ -155,8 +156,8 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
|
||||
if err == nil {
|
||||
return (*LeaseRevokeResponse)(resp), nil
|
||||
}
|
||||
if isHalted(ctx, err) {
|
||||
return nil, err
|
||||
if isHaltErr(ctx, err) {
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
if nerr := l.switchRemoteAndStream(err); nerr != nil {
|
||||
@ -204,8 +205,8 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
if isHalted(ctx, err) {
|
||||
return resp, err
|
||||
if isHaltErr(ctx, err) {
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
nerr := l.switchRemoteAndStream(err)
|
||||
@ -259,17 +260,17 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
|
||||
|
||||
stream, err := l.getRemote().LeaseKeepAlive(cctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
resp, rerr := stream.Recv()
|
||||
if rerr != nil {
|
||||
return nil, rerr
|
||||
return nil, rpctypes.Error(rerr)
|
||||
}
|
||||
|
||||
karesp := &LeaseKeepAliveResponse{
|
||||
@ -296,7 +297,7 @@ func (l *lessor) recvKeepAliveLoop() {
|
||||
for serr == nil {
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
if isHalted(l.stopCtx, err) {
|
||||
if isHaltErr(l.stopCtx, err) {
|
||||
return
|
||||
}
|
||||
stream, serr = l.resetRecv()
|
||||
@ -411,7 +412,7 @@ func (l *lessor) switchRemoteAndStream(prevErr error) error {
|
||||
conn.Close()
|
||||
newConn, err = l.c.retryConnection(conn, prevErr)
|
||||
if err != nil {
|
||||
return err
|
||||
return rpctypes.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -436,7 +437,7 @@ func (l *lessor) newStream() error {
|
||||
stream, err := l.getRemote().LeaseKeepAlive(sctx)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
return rpctypes.Error(err)
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
@ -81,8 +82,8 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
|
||||
if err == nil {
|
||||
return (*AlarmResponse)(resp), nil
|
||||
}
|
||||
if isHalted(ctx, err) {
|
||||
return nil, err
|
||||
if isHaltErr(ctx, err) {
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
if err = m.switchRemote(err); err != nil {
|
||||
return nil, err
|
||||
@ -100,13 +101,13 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
|
||||
if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE {
|
||||
ar, err := m.AlarmList(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
ret := AlarmResponse{}
|
||||
for _, am := range ar.Alarms {
|
||||
dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am))
|
||||
if derr != nil {
|
||||
return nil, derr
|
||||
return nil, rpctypes.Error(derr)
|
||||
}
|
||||
ret.Alarms = append(ret.Alarms, dresp.Alarms...)
|
||||
}
|
||||
@ -117,21 +118,21 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
|
||||
if err == nil {
|
||||
return (*AlarmResponse)(resp), nil
|
||||
}
|
||||
if !isHalted(ctx, err) {
|
||||
if isHaltErr(ctx, err) {
|
||||
go m.switchRemote(err)
|
||||
}
|
||||
return nil, err
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
|
||||
conn, err := m.c.Dial(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
remote := pb.NewMaintenanceClient(conn)
|
||||
resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
return (*DefragmentResponse)(resp), nil
|
||||
}
|
||||
@ -139,12 +140,12 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm
|
||||
func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
|
||||
conn, err := m.c.Dial(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
remote := pb.NewMaintenanceClient(conn)
|
||||
resp, err := remote.Status(ctx, &pb.StatusRequest{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
return (*StatusResponse)(resp), nil
|
||||
}
|
||||
@ -152,7 +153,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo
|
||||
func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
|
||||
ss, err := m.getRemote().Snapshot(ctx, &pb.SnapshotRequest{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
@ -187,7 +188,7 @@ func (m *maintenance) switchRemote(prevErr error) error {
|
||||
defer m.mu.Unlock()
|
||||
newConn, err := m.c.retryConnection(m.conn, prevErr)
|
||||
if err != nil {
|
||||
return err
|
||||
return rpctypes.Error(err)
|
||||
}
|
||||
m.conn = newConn
|
||||
m.remote = pb.NewMaintenanceClient(m.conn)
|
||||
|
@ -17,6 +17,7 @@ package clientv3
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
@ -146,13 +147,13 @@ func (txn *txn) Commit() (*TxnResponse, error) {
|
||||
return (*TxnResponse)(resp), nil
|
||||
}
|
||||
|
||||
if isHalted(txn.ctx, err) {
|
||||
return nil, err
|
||||
if isHaltErr(txn.ctx, err) {
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
if txn.isWrite {
|
||||
go kv.switchRemote(err)
|
||||
return nil, err
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
if nerr := kv.switchRemote(err); nerr != nil {
|
||||
|
@ -209,7 +209,7 @@ func (w *watcher) Close() error {
|
||||
case <-w.donec:
|
||||
}
|
||||
<-w.donec
|
||||
return <-w.errc
|
||||
return v3rpc.Error(<-w.errc)
|
||||
}
|
||||
|
||||
func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
|
||||
@ -496,7 +496,7 @@ func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return ws, err
|
||||
return ws, v3rpc.Error(err)
|
||||
}
|
||||
|
||||
// openWatchClient retries opening a watchclient until retryConnection fails
|
||||
@ -504,8 +504,8 @@ func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
||||
for {
|
||||
if ws, err = w.remote.Watch(w.ctx); ws != nil {
|
||||
break
|
||||
} else if isHalted(w.ctx, err) {
|
||||
return nil, err
|
||||
} else if isHaltErr(w.ctx, err) {
|
||||
return nil, v3rpc.Error(err)
|
||||
}
|
||||
newConn, nerr := w.c.retryConnection(w.conn, nil)
|
||||
if nerr != nil {
|
||||
|
@ -42,4 +42,40 @@ var (
|
||||
ErrRoleAlreadyExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: role name already exists")
|
||||
ErrRoleNotFound = grpc.Errorf(codes.FailedPrecondition, "etcdserver: role name not found")
|
||||
ErrAuthFailed = grpc.Errorf(codes.InvalidArgument, "etcdserver: authentication failed, invalid user ID or password")
|
||||
|
||||
errStringToError = map[string]error{
|
||||
grpc.ErrorDesc(ErrEmptyKey): ErrEmptyKey,
|
||||
grpc.ErrorDesc(ErrTooManyOps): ErrTooManyOps,
|
||||
grpc.ErrorDesc(ErrDuplicateKey): ErrDuplicateKey,
|
||||
grpc.ErrorDesc(ErrCompacted): ErrCompacted,
|
||||
grpc.ErrorDesc(ErrFutureRev): ErrFutureRev,
|
||||
grpc.ErrorDesc(ErrNoSpace): ErrNoSpace,
|
||||
|
||||
grpc.ErrorDesc(ErrLeaseNotFound): ErrLeaseNotFound,
|
||||
grpc.ErrorDesc(ErrLeaseExist): ErrLeaseExist,
|
||||
|
||||
grpc.ErrorDesc(ErrMemberExist): ErrMemberExist,
|
||||
grpc.ErrorDesc(ErrPeerURLExist): ErrPeerURLExist,
|
||||
grpc.ErrorDesc(ErrMemberBadURLs): ErrMemberBadURLs,
|
||||
grpc.ErrorDesc(ErrMemberNotFound): ErrMemberNotFound,
|
||||
|
||||
grpc.ErrorDesc(ErrRequestTooLarge): ErrRequestTooLarge,
|
||||
|
||||
grpc.ErrorDesc(ErrUserAlreadyExist): ErrUserAlreadyExist,
|
||||
grpc.ErrorDesc(ErrUserNotFound): ErrUserNotFound,
|
||||
grpc.ErrorDesc(ErrRoleAlreadyExist): ErrRoleAlreadyExist,
|
||||
grpc.ErrorDesc(ErrRoleNotFound): ErrRoleNotFound,
|
||||
grpc.ErrorDesc(ErrAuthFailed): ErrAuthFailed,
|
||||
}
|
||||
)
|
||||
|
||||
func Error(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
v, ok := errStringToError[err.Error()]
|
||||
if !ok {
|
||||
return err
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
@ -68,11 +68,12 @@ var (
|
||||
)
|
||||
|
||||
type ClusterConfig struct {
|
||||
Size int
|
||||
PeerTLS *transport.TLSInfo
|
||||
ClientTLS *transport.TLSInfo
|
||||
DiscoveryURL string
|
||||
UseGRPC bool
|
||||
Size int
|
||||
PeerTLS *transport.TLSInfo
|
||||
ClientTLS *transport.TLSInfo
|
||||
DiscoveryURL string
|
||||
UseGRPC bool
|
||||
QuotaBackendBytes int64
|
||||
}
|
||||
|
||||
type cluster struct {
|
||||
@ -196,7 +197,7 @@ func (c *cluster) HTTPMembers() []client.Member {
|
||||
|
||||
func (c *cluster) mustNewMember(t *testing.T) *member {
|
||||
name := c.name(rand.Int())
|
||||
m := mustNewMember(t, name, c.cfg.PeerTLS, c.cfg.ClientTLS)
|
||||
m := mustNewMember(t, name, c.cfg.PeerTLS, c.cfg.ClientTLS, c.cfg.QuotaBackendBytes)
|
||||
m.DiscoveryURL = c.cfg.DiscoveryURL
|
||||
if c.cfg.UseGRPC {
|
||||
if err := m.listenGRPC(); err != nil {
|
||||
@ -417,7 +418,7 @@ type member struct {
|
||||
|
||||
// mustNewMember return an inited member with the given name. If peerTLS is
|
||||
// set, it will use https scheme to communicate between peers.
|
||||
func mustNewMember(t *testing.T, name string, peerTLS *transport.TLSInfo, clientTLS *transport.TLSInfo) *member {
|
||||
func mustNewMember(t *testing.T, name string, peerTLS *transport.TLSInfo, clientTLS *transport.TLSInfo, quotaBackendBytes int64) *member {
|
||||
var err error
|
||||
m := &member{}
|
||||
|
||||
@ -464,6 +465,7 @@ func mustNewMember(t *testing.T, name string, peerTLS *transport.TLSInfo, client
|
||||
}
|
||||
m.ElectionTicks = electionTicks
|
||||
m.TickMs = uint(tickDuration / time.Millisecond)
|
||||
m.QuotaBackendBytes = quotaBackendBytes
|
||||
return m
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user