Introduce Kubernetes KV interface to etcd client

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2023-07-31 11:09:37 +02:00
parent c9fdc609bb
commit 6c98a96866
10 changed files with 365 additions and 58 deletions

View File

@ -135,67 +135,67 @@ func NewAuthFromAuthClient(remote pb.AuthClient, c *Client) Auth {
func (auth *authClient) Authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthenticateResponse)(resp), toErr(ctx, err)
return (*AuthenticateResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...)
return (*AuthEnableResponse)(resp), toErr(ctx, err)
return (*AuthEnableResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, auth.callOpts...)
return (*AuthDisableResponse)(resp), toErr(ctx, err)
return (*AuthDisableResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) AuthStatus(ctx context.Context) (*AuthStatusResponse, error) {
resp, err := auth.remote.AuthStatus(ctx, &pb.AuthStatusRequest{}, auth.callOpts...)
return (*AuthStatusResponse)(resp), toErr(ctx, err)
return (*AuthStatusResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password, Options: &authpb.UserAddOptions{NoPassword: false}}, auth.callOpts...)
return (*AuthUserAddResponse)(resp), toErr(ctx, err)
return (*AuthUserAddResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) UserAddWithOptions(ctx context.Context, name string, password string, options *UserAddOptions) (*AuthUserAddResponse, error) {
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password, Options: (*authpb.UserAddOptions)(options)}, auth.callOpts...)
return (*AuthUserAddResponse)(resp), toErr(ctx, err)
return (*AuthUserAddResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, auth.callOpts...)
return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
return (*AuthUserDeleteResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
return (*AuthUserChangePasswordResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, auth.callOpts...)
return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
return (*AuthUserGrantRoleResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, auth.callOpts...)
return (*AuthUserGetResponse)(resp), toErr(ctx, err)
return (*AuthUserGetResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) UserList(ctx context.Context) (*AuthUserListResponse, error) {
resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, auth.callOpts...)
return (*AuthUserListResponse)(resp), toErr(ctx, err)
return (*AuthUserListResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, auth.callOpts...)
return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
return (*AuthUserRevokeRoleResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, auth.callOpts...)
return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
return (*AuthRoleAddResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error) {
@ -205,27 +205,27 @@ func (auth *authClient) RoleGrantPermission(ctx context.Context, name string, ke
PermType: authpb.Permission_Type(permType),
}
resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, auth.callOpts...)
return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
return (*AuthRoleGrantPermissionResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, auth.callOpts...)
return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
return (*AuthRoleGetResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, auth.callOpts...)
return (*AuthRoleListResponse)(resp), toErr(ctx, err)
return (*AuthRoleListResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: []byte(key), RangeEnd: []byte(rangeEnd)}, auth.callOpts...)
return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
return (*AuthRoleRevokePermissionResponse)(resp), ContextError(ctx, err)
}
func (auth *authClient) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, auth.callOpts...)
return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
return (*AuthRoleDeleteResponse)(resp), ContextError(ctx, err)
}
func StrToPermissionType(s string) (PermissionType, error) {

View File

@ -154,7 +154,7 @@ func (c *Client) Close() error {
c.Lease.Close()
}
if c.conn != nil {
return toErr(c.ctx, c.conn.Close())
return ContextError(c.ctx, c.conn.Close())
}
return c.ctx.Err()
}
@ -598,7 +598,9 @@ func isUnavailableErr(ctx context.Context, err error) bool {
return false
}
func toErr(ctx context.Context, err error) error {
// ContextError converts the error into an EtcdError if the error message matches one of
// the defined messages; otherwise, it tries to retrieve the context error.
func ContextError(ctx context.Context, err error) error {
if err == nil {
return nil
}

View File

@ -93,7 +93,7 @@ func (c *cluster) memberAdd(ctx context.Context, peerAddrs []string, isLearner b
}
resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
return (*MemberAddResponse)(resp), nil
}
@ -102,7 +102,7 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes
r := &pb.MemberRemoveRequest{ID: id}
resp, err := c.remote.MemberRemove(ctx, r, c.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
return (*MemberRemoveResponse)(resp), nil
}
@ -119,7 +119,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
if err == nil {
return (*MemberUpdateResponse)(resp), nil
}
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
func (c *cluster) MemberList(ctx context.Context, opts ...OpOption) (*MemberListResponse, error) {
@ -128,14 +128,14 @@ func (c *cluster) MemberList(ctx context.Context, opts ...OpOption) (*MemberList
if err == nil {
return (*MemberListResponse)(resp), nil
}
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
func (c *cluster) MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error) {
r := &pb.MemberPromoteRequest{ID: id}
resp, err := c.remote.MemberPromote(ctx, r, c.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
return (*MemberPromoteResponse)(resp), nil
}

View File

@ -0,0 +1,165 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"context"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
// New creates Client from config.
// Caller is responsible to call Close() to clean up client.
func New(cfg clientv3.Config) (*Client, error) {
c, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
kc := &Client{
Client: c,
kv: clientv3.RetryKVClient(c),
}
kc.Kubernetes = kc
return kc, nil
}
type Client struct {
*clientv3.Client
Kubernetes Interface
kv pb.KVClient
}
var _ Interface = (*Client)(nil)
func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetResponse, err error) {
rangeResp, err := k.kv.Range(ctx, getRequest(key, opts.Revision))
if err != nil {
return resp, clientv3.ContextError(ctx, err)
}
resp.Revision = rangeResp.Header.Revision
if len(rangeResp.Kvs) == 1 {
resp.KV = rangeResp.Kvs[0]
}
return resp, nil
}
func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp ListResponse, err error) {
rangeStart := prefix + opts.Continue
rangeEnd := clientv3.GetPrefixRangeEnd(prefix)
rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{
Key: []byte(rangeStart),
RangeEnd: []byte(rangeEnd),
Limit: opts.Limit,
Revision: opts.Revision,
})
if err != nil {
return resp, clientv3.ContextError(ctx, err)
}
resp.Kvs = rangeResp.Kvs
resp.Count = rangeResp.Count
resp.Revision = rangeResp.Header.Revision
return resp, nil
}
func (k Client) Count(ctx context.Context, prefix string, _ CountOptions) (int64, error) {
resp, err := k.kv.Range(ctx, &pb.RangeRequest{
Key: []byte(prefix),
RangeEnd: []byte(clientv3.GetPrefixRangeEnd(prefix)),
CountOnly: true,
})
if err != nil {
return 0, clientv3.ContextError(ctx, err)
}
return resp.Count, nil
}
func (k Client) OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (resp PutResponse, err error) {
onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(key), Value: value, Lease: int64(opts.LeaseID)}}}
var onFailure *pb.RequestOp
if opts.GetOnFailure {
onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}}
}
txnResp, err := k.optimisticTxn(ctx, key, expectedRevision, onSuccess, onFailure)
if err != nil {
return resp, clientv3.ContextError(ctx, err)
}
resp.Succeeded = txnResp.Succeeded
resp.Revision = txnResp.Header.Revision
if opts.GetOnFailure && !txnResp.Succeeded {
resp.KV = kvFromTxnResponse(txnResp.Responses[0])
}
return resp, nil
}
func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (resp DeleteResponse, err error) {
onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}}
var onFailure *pb.RequestOp
if opts.GetOnFailure {
onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}}
}
txnResp, err := k.optimisticTxn(ctx, key, expectedRevision, onSuccess, onFailure)
if err != nil {
return resp, clientv3.ContextError(ctx, err)
}
resp.Succeeded = txnResp.Succeeded
resp.Revision = txnResp.Header.Revision
if opts.GetOnFailure && !txnResp.Succeeded {
resp.KV = kvFromTxnResponse(txnResp.Responses[0])
}
return resp, nil
}
func (k Client) optimisticTxn(ctx context.Context, key string, expectedRevision int64, onSuccess, onFailure *pb.RequestOp) (*pb.TxnResponse, error) {
txn := &pb.TxnRequest{
Compare: []*pb.Compare{
{
Result: pb.Compare_EQUAL,
Target: pb.Compare_MOD,
Key: []byte(key),
TargetUnion: &pb.Compare_ModRevision{ModRevision: expectedRevision},
},
},
}
if onSuccess != nil {
txn.Success = []*pb.RequestOp{onSuccess}
}
if onFailure != nil {
txn.Failure = []*pb.RequestOp{onFailure}
}
return k.kv.Txn(ctx, txn)
}
func getRequest(key string, revision int64) *pb.RangeRequest {
return &pb.RangeRequest{
Key: []byte(key),
Revision: revision,
Limit: 1,
}
}
func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue {
getResponse := resp.GetResponseRange()
if len(getResponse.Kvs) == 1 {
return getResponse.Kvs[0]
}
return nil
}

View File

@ -0,0 +1,140 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"context"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
// Interface defines the minimal client-side interface that Kubernetes requires
// to interact with etcd. Methods below are standard etcd operations with
// semantics adjusted to better suit Kubernetes' needs.
type Interface interface {
// Get retrieves a single key-value pair from etcd.
//
// If opts.Revision is set to a non-zero value, the key-value pair is retrieved at the specified revision.
// If the required revision has been compacted, the request will fail with ErrCompacted.
Get(ctx context.Context, key string, opts GetOptions) (GetResponse, error)
// List retrieves key-value pairs with the specified prefix, ordered lexicographically by key.
//
// If opts.Revision is non-zero, the key-value pairs are retrieved at the specified revision.
// If the required revision has been compacted, the request will fail with ErrCompacted.
// If opts.Limit is greater than zero, the number of returned key-value pairs is bounded by the limit.
// If opts.Continue is not empty, the listing will start from the key immediately after the one specified by Continue.
// The Continue value should be the last key returned in a previous paginated ListResponse.
List(ctx context.Context, prefix string, opts ListOptions) (ListResponse, error)
// Count returns the number of keys with the specified prefix.
//
// Currently, there are no options for the Count operation. However, a placeholder options struct (CountOptions)
// is provided for future extensibility in case options become necessary.
Count(ctx context.Context, prefix string, opts CountOptions) (int64, error)
// OptimisticPut creates or updates a key-value pair if the key has not been modified or created
// since the revision specified in expectedRevision.
//
// An OptimisticPut fails if the key has been modified since expectedRevision.
OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (PutResponse, error)
// OptimisticDelete deletes the key-value pair if it hasn't been modified since the revision
// specified in expectedRevision.
//
// An OptimisticDelete fails if the key has been modified since expectedRevision.
OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (DeleteResponse, error)
}
type GetOptions struct {
// Revision is the point-in-time of the etcd key-value store to use for the Get operation.
// If Revision is 0, it gets the latest value.
Revision int64
}
type ListOptions struct {
// Revision is the point-in-time of the etcd key-value store to use for the List operation.
// If Revision is 0, it gets the latest values.
Revision int64
// Limit is the maximum number of keys to return for a List operation.
// 0 means no limitation.
Limit int64
// Continue is a key from which to resume the List operation, excluding the given key.
// It should be set to the last key from a previous ListResponse when paginating.
Continue string
}
// CountOptions is a placeholder for potential future options for the Count operation.
type CountOptions struct{}
type PutOptions struct {
// GetOnFailure specifies whether to return the modified key-value pair if the Put operation fails due to a revision mismatch.
GetOnFailure bool
// LeaseID is the ID of a lease to associate with the key allowing for automatic deletion after lease expires after it's TTL (time to live).
// Deprecated: Should be replaced with TTL when Interface starts using one lease per object.
LeaseID clientv3.LeaseID
}
type DeleteOptions struct {
// GetOnFailure specifies whether to return the modified key-value pair if the Delete operation fails due to a revision mismatch.
GetOnFailure bool
}
type GetResponse struct {
// KV is the key-value pair retrieved from etcd.
KV *mvccpb.KeyValue
// Revision is the revision of the key-value store at the time of the Get operation.
Revision int64
}
type ListResponse struct {
// Kvs is the list of key-value pairs retrieved from etcd, ordered lexicographically by key.
Kvs []*mvccpb.KeyValue
// Count is the total number of keys with the specified prefix, even if not all were returned due to a limit.
Count int64
// Revision is the revision of the key-value store at the time of the List operation.
Revision int64
}
type PutResponse struct {
// KV is the created or updated key-value pair. If the Put operation failed and GetOnFailure was true, this
// will be the modified key-value pair that caused the failure.
KV *mvccpb.KeyValue
// Succeeded indicates whether the Put operation was successful.
Succeeded bool
// Revision is the revision of the key-value store after the Put operation.
Revision int64
}
type DeleteResponse struct {
// KV is the deleted key-value pair. If the Delete operation failed and GetOnFailure was true, this
// will be the modified key-value pair that caused the failure.
KV *mvccpb.KeyValue
// Succeeded indicates whether the Delete operation was successful.
Succeeded bool
// Revision is the revision of the key-value store after the Delete operation.
Revision int64
}

View File

@ -113,23 +113,23 @@ func NewKVFromKVClient(remote pb.KVClient, c *Client) KV {
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
r, err := kv.Do(ctx, OpPut(key, val, opts...))
return r.put, toErr(ctx, err)
return r.put, ContextError(ctx, err)
}
func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) {
r, err := kv.Do(ctx, OpGet(key, opts...))
return r.get, toErr(ctx, err)
return r.get, ContextError(ctx, err)
}
func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) {
r, err := kv.Do(ctx, OpDelete(key, opts...))
return r.del, toErr(ctx, err)
return r.del, ContextError(ctx, err)
}
func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) {
resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
return (*CompactResponse)(resp), err
}
@ -178,5 +178,5 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
default:
panic("Unknown op")
}
return OpResponse{}, toErr(ctx, err)
return OpResponse{}, ContextError(ctx, err)
}

View File

@ -223,7 +223,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
}
return gresp, nil
}
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
@ -232,14 +232,14 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
if err == nil {
return (*LeaseRevokeResponse)(resp), nil
}
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
r := toLeaseTimeToLiveRequest(id, opts...)
resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
gresp := &LeaseTimeToLiveResponse{
ResponseHeader: resp.GetHeader(),
@ -260,7 +260,7 @@ func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
}
return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
}
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
@ -315,7 +315,7 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
return resp, err
}
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
}
}
@ -405,13 +405,13 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (karesp *LeaseKe
stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
defer func() {
if cerr := stream.CloseSend(); cerr != nil {
if ferr == nil {
ferr = toErr(ctx, cerr)
ferr = ContextError(ctx, cerr)
}
return
}
@ -419,12 +419,12 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (karesp *LeaseKe
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
resp, rerr := stream.Recv()
if rerr != nil {
return nil, toErr(ctx, rerr)
return nil, ContextError(ctx, rerr)
}
karesp = &LeaseKeepAliveResponse{
@ -464,7 +464,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
return err
}
if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
if ContextError(l.stopCtx, err) == rpctypes.ErrNoLeader {
l.closeRequireLeader()
}
break

View File

@ -155,7 +155,7 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
if err == nil {
return (*AlarmResponse)(resp), nil
}
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) {
@ -168,13 +168,13 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE {
ar, err := m.AlarmList(ctx)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
ret := AlarmResponse{}
for _, am := range ar.Alarms {
dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am))
if derr != nil {
return nil, toErr(ctx, derr)
return nil, ContextError(ctx, derr)
}
ret.Alarms = append(ret.Alarms, dresp.Alarms...)
}
@ -185,18 +185,18 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
if err == nil {
return (*AlarmResponse)(resp), nil
}
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
remote, cancel, err := m.dial(endpoint)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
defer cancel()
resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
return (*DefragmentResponse)(resp), nil
}
@ -204,12 +204,12 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm
func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
remote, cancel, err := m.dial(endpoint)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
defer cancel()
resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
return (*StatusResponse)(resp), nil
}
@ -218,12 +218,12 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*
remote, cancel, err := m.dial(endpoint)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
defer cancel()
resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, m.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
return (*HashKVResponse)(resp), nil
}
@ -231,7 +231,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*
func (m *maintenance) SnapshotWithVersion(ctx context.Context) (*SnapshotResponse, error) {
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
m.lg.Info("opened snapshot stream; downloading")
@ -274,7 +274,7 @@ func (m *maintenance) SnapshotWithVersion(ctx context.Context) (*SnapshotRespons
func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...)
if err != nil {
return nil, toErr(ctx, err)
return nil, ContextError(ctx, err)
}
m.lg.Info("opened snapshot stream; downloading")
@ -326,12 +326,12 @@ type snapshotReadCloser struct {
func (rc *snapshotReadCloser) Read(p []byte) (n int, err error) {
n, err = rc.ReadCloser.Read(p)
return n, toErr(rc.ctx, err)
return n, ContextError(rc.ctx, err)
}
func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, m.callOpts...)
return (*MoveLeaderResponse)(resp), toErr(ctx, err)
return (*MoveLeaderResponse)(resp), ContextError(ctx, err)
}
func (m *maintenance) Downgrade(ctx context.Context, action DowngradeAction, version string) (*DowngradeResponse, error) {
@ -347,5 +347,5 @@ func (m *maintenance) Downgrade(ctx context.Context, action DowngradeAction, ver
return nil, errors.New("etcdclient: unknown downgrade action")
}
resp, err := m.remote.Downgrade(ctx, &pb.DowngradeRequest{Action: actionType, Version: version}, m.callOpts...)
return (*DowngradeResponse)(resp), toErr(ctx, err)
return (*DowngradeResponse)(resp), ContextError(ctx, err)
}

View File

@ -144,7 +144,7 @@ func (txn *txn) Commit() (*TxnResponse, error) {
var err error
resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
if err != nil {
return nil, toErr(txn.ctx, err)
return nil, ContextError(txn.ctx, err)
}
return (*TxnResponse)(resp), nil
}

View File

@ -442,7 +442,7 @@ func (w *watchGRPCStream) close() (err error) {
case err = <-w.errc:
default:
}
return toErr(w.ctx, err)
return ContextError(w.ctx, err)
}
func (w *watcher) closeStream(wgs *watchGRPCStream) {
@ -653,7 +653,7 @@ func (w *watchGRPCStream) run() {
// watch client failed on Recv; spawn another if possible
case err := <-w.errc:
if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
if isHaltErr(w.ctx, err) || ContextError(w.ctx, err) == v3rpc.ErrNoLeader {
closeErr = err
return
}