mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Fix passing default grpc call options in Kubernetes client
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
fe796ab1aa
commit
2bcaed1e09
@ -16,6 +16,7 @@ package kubernetes
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
@ -31,7 +32,6 @@ func New(cfg clientv3.Config) (*Client, error) {
|
|||||||
}
|
}
|
||||||
kc := &Client{
|
kc := &Client{
|
||||||
Client: c,
|
Client: c,
|
||||||
kv: clientv3.RetryKVClient(c),
|
|
||||||
}
|
}
|
||||||
kc.Kubernetes = kc
|
kc.Kubernetes = kc
|
||||||
return kc, nil
|
return kc, nil
|
||||||
@ -40,15 +40,14 @@ func New(cfg clientv3.Config) (*Client, error) {
|
|||||||
type Client struct {
|
type Client struct {
|
||||||
*clientv3.Client
|
*clientv3.Client
|
||||||
Kubernetes Interface
|
Kubernetes Interface
|
||||||
kv pb.KVClient
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Interface = (*Client)(nil)
|
var _ Interface = (*Client)(nil)
|
||||||
|
|
||||||
func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetResponse, err error) {
|
func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetResponse, err error) {
|
||||||
rangeResp, err := k.kv.Range(ctx, getRequest(key, opts.Revision))
|
rangeResp, err := k.KV.Get(ctx, key, clientv3.WithRev(opts.Revision), clientv3.WithLimit(1))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return resp, clientv3.ContextError(ctx, err)
|
return resp, err
|
||||||
}
|
}
|
||||||
resp.Revision = rangeResp.Header.Revision
|
resp.Revision = rangeResp.Header.Revision
|
||||||
if len(rangeResp.Kvs) == 1 {
|
if len(rangeResp.Kvs) == 1 {
|
||||||
@ -58,17 +57,14 @@ func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetR
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp ListResponse, err error) {
|
func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp ListResponse, err error) {
|
||||||
rangeStart := prefix + opts.Continue
|
rangeStart := prefix
|
||||||
|
if opts.Continue != "" {
|
||||||
|
rangeStart = opts.Continue
|
||||||
|
}
|
||||||
rangeEnd := clientv3.GetPrefixRangeEnd(prefix)
|
rangeEnd := clientv3.GetPrefixRangeEnd(prefix)
|
||||||
|
rangeResp, err := k.KV.Get(ctx, rangeStart, clientv3.WithRange(rangeEnd), clientv3.WithLimit(opts.Limit), clientv3.WithRev(opts.Revision))
|
||||||
rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{
|
|
||||||
Key: []byte(rangeStart),
|
|
||||||
RangeEnd: []byte(rangeEnd),
|
|
||||||
Limit: opts.Limit,
|
|
||||||
Revision: opts.Revision,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return resp, clientv3.ContextError(ctx, err)
|
return resp, err
|
||||||
}
|
}
|
||||||
resp.Kvs = rangeResp.Kvs
|
resp.Kvs = rangeResp.Kvs
|
||||||
resp.Count = rangeResp.Count
|
resp.Count = rangeResp.Count
|
||||||
@ -77,48 +73,51 @@ func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (k Client) Count(ctx context.Context, prefix string, _ CountOptions) (int64, error) {
|
func (k Client) Count(ctx context.Context, prefix string, _ CountOptions) (int64, error) {
|
||||||
resp, err := k.kv.Range(ctx, &pb.RangeRequest{
|
resp, err := k.KV.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCountOnly())
|
||||||
Key: []byte(prefix),
|
|
||||||
RangeEnd: []byte(clientv3.GetPrefixRangeEnd(prefix)),
|
|
||||||
CountOnly: true,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, clientv3.ContextError(ctx, err)
|
return 0, err
|
||||||
}
|
}
|
||||||
return resp.Count, nil
|
return resp.Count, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k Client) OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (resp PutResponse, err error) {
|
func (k Client) OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (resp PutResponse, err error) {
|
||||||
onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(key), Value: value, Lease: int64(opts.LeaseID)}}}
|
txn := k.KV.Txn(ctx).If(
|
||||||
|
clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision),
|
||||||
|
).Then(
|
||||||
|
clientv3.OpPut(key, string(value), clientv3.WithLease(opts.LeaseID)),
|
||||||
|
)
|
||||||
|
|
||||||
var onFailure *pb.RequestOp
|
|
||||||
if opts.GetOnFailure {
|
if opts.GetOnFailure {
|
||||||
onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}}
|
txn = txn.Else(clientv3.OpGet(key))
|
||||||
}
|
}
|
||||||
|
|
||||||
txnResp, err := k.optimisticTxn(ctx, key, expectedRevision, onSuccess, onFailure)
|
txnResp, err := txn.Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return resp, clientv3.ContextError(ctx, err)
|
return resp, err
|
||||||
}
|
}
|
||||||
resp.Succeeded = txnResp.Succeeded
|
resp.Succeeded = txnResp.Succeeded
|
||||||
resp.Revision = txnResp.Header.Revision
|
resp.Revision = txnResp.Header.Revision
|
||||||
if opts.GetOnFailure && !txnResp.Succeeded {
|
if opts.GetOnFailure && !txnResp.Succeeded {
|
||||||
|
if len(txnResp.Responses) == 0 {
|
||||||
|
return resp, fmt.Errorf("invalid OptimisticPut response: %v", txnResp.Responses)
|
||||||
|
}
|
||||||
resp.KV = kvFromTxnResponse(txnResp.Responses[0])
|
resp.KV = kvFromTxnResponse(txnResp.Responses[0])
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (resp DeleteResponse, err error) {
|
func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (resp DeleteResponse, err error) {
|
||||||
onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}}
|
txn := k.KV.Txn(ctx).If(
|
||||||
|
clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision),
|
||||||
var onFailure *pb.RequestOp
|
).Then(
|
||||||
|
clientv3.OpDelete(key),
|
||||||
|
)
|
||||||
if opts.GetOnFailure {
|
if opts.GetOnFailure {
|
||||||
onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}}
|
txn = txn.Else(clientv3.OpGet(key))
|
||||||
}
|
}
|
||||||
|
txnResp, err := txn.Commit()
|
||||||
txnResp, err := k.optimisticTxn(ctx, key, expectedRevision, onSuccess, onFailure)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return resp, clientv3.ContextError(ctx, err)
|
return resp, err
|
||||||
}
|
}
|
||||||
resp.Succeeded = txnResp.Succeeded
|
resp.Succeeded = txnResp.Succeeded
|
||||||
resp.Revision = txnResp.Header.Revision
|
resp.Revision = txnResp.Header.Revision
|
||||||
@ -128,34 +127,6 @@ func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevisi
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k Client) optimisticTxn(ctx context.Context, key string, expectedRevision int64, onSuccess, onFailure *pb.RequestOp) (*pb.TxnResponse, error) {
|
|
||||||
txn := &pb.TxnRequest{
|
|
||||||
Compare: []*pb.Compare{
|
|
||||||
{
|
|
||||||
Result: pb.Compare_EQUAL,
|
|
||||||
Target: pb.Compare_MOD,
|
|
||||||
Key: []byte(key),
|
|
||||||
TargetUnion: &pb.Compare_ModRevision{ModRevision: expectedRevision},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if onSuccess != nil {
|
|
||||||
txn.Success = []*pb.RequestOp{onSuccess}
|
|
||||||
}
|
|
||||||
if onFailure != nil {
|
|
||||||
txn.Failure = []*pb.RequestOp{onFailure}
|
|
||||||
}
|
|
||||||
return k.kv.Txn(ctx, txn)
|
|
||||||
}
|
|
||||||
|
|
||||||
func getRequest(key string, revision int64) *pb.RangeRequest {
|
|
||||||
return &pb.RangeRequest{
|
|
||||||
Key: []byte(key),
|
|
||||||
Revision: revision,
|
|
||||||
Limit: 1,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue {
|
func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue {
|
||||||
getResponse := resp.GetResponseRange()
|
getResponse := resp.GetResponseRange()
|
||||||
if len(getResponse.Kvs) == 1 {
|
if len(getResponse.Kvs) == 1 {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user