mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #8341 from visheshnp/leasing-pr
clientv3: Disconnected Linearized Reads
This commit is contained in:
1939
clientv3/integration/leasing_test.go
Normal file
1939
clientv3/integration/leasing_test.go
Normal file
File diff suppressed because it is too large
Load Diff
@@ -74,16 +74,16 @@ func (op OpResponse) Get() *GetResponse { return op.get }
|
||||
func (op OpResponse) Del() *DeleteResponse { return op.del }
|
||||
func (op OpResponse) Txn() *TxnResponse { return op.txn }
|
||||
|
||||
func (resp *PutResponse) ToOpResponse() OpResponse {
|
||||
func (resp *PutResponse) OpResponse() OpResponse {
|
||||
return OpResponse{put: resp}
|
||||
}
|
||||
func (resp *GetResponse) ToOpResponse() OpResponse {
|
||||
func (resp *GetResponse) OpResponse() OpResponse {
|
||||
return OpResponse{get: resp}
|
||||
}
|
||||
func (resp *DeleteResponse) ToOpResponse() OpResponse {
|
||||
func (resp *DeleteResponse) OpResponse() OpResponse {
|
||||
return OpResponse{del: resp}
|
||||
}
|
||||
func (resp *TxnResponse) ToOpResponse() OpResponse {
|
||||
func (resp *TxnResponse) OpResponse() OpResponse {
|
||||
return OpResponse{txn: resp}
|
||||
}
|
||||
|
||||
|
||||
306
clientv3/leasing/cache.go
Normal file
306
clientv3/leasing/cache.go
Normal file
@@ -0,0 +1,306 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package leasing
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v3 "github.com/coreos/etcd/clientv3"
|
||||
v3pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const revokeBackoff = 2 * time.Second
|
||||
|
||||
type leaseCache struct {
|
||||
mu sync.RWMutex
|
||||
entries map[string]*leaseKey
|
||||
revokes map[string]time.Time
|
||||
header *v3pb.ResponseHeader
|
||||
}
|
||||
|
||||
type leaseKey struct {
|
||||
response *v3.GetResponse
|
||||
// rev is the leasing key revision.
|
||||
rev int64
|
||||
waitc chan struct{}
|
||||
}
|
||||
|
||||
func (lc *leaseCache) Rev(key string) int64 {
|
||||
lc.mu.RLock()
|
||||
defer lc.mu.RUnlock()
|
||||
if li := lc.entries[key]; li != nil {
|
||||
return li.rev
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (lc *leaseCache) Lock(key string) (chan<- struct{}, int64) {
|
||||
lc.mu.Lock()
|
||||
defer lc.mu.Unlock()
|
||||
if li := lc.entries[key]; li != nil {
|
||||
li.waitc = make(chan struct{})
|
||||
return li.waitc, li.rev
|
||||
}
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
func (lc *leaseCache) LockRange(begin, end string) (ret []chan<- struct{}) {
|
||||
lc.mu.Lock()
|
||||
defer lc.mu.Unlock()
|
||||
for k, li := range lc.entries {
|
||||
if inRange(k, begin, end) {
|
||||
li.waitc = make(chan struct{})
|
||||
ret = append(ret, li.waitc)
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func inRange(k, begin, end string) bool {
|
||||
if strings.Compare(k, begin) < 0 {
|
||||
return false
|
||||
}
|
||||
if end != "\x00" && strings.Compare(k, end) >= 0 {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (lc *leaseCache) LockWriteOps(ops []v3.Op) (ret []chan<- struct{}) {
|
||||
for _, op := range ops {
|
||||
if op.IsGet() {
|
||||
continue
|
||||
}
|
||||
key := string(op.KeyBytes())
|
||||
if end := string(op.RangeBytes()); end == "" {
|
||||
if wc, _ := lc.Lock(key); wc != nil {
|
||||
ret = append(ret, wc)
|
||||
}
|
||||
} else {
|
||||
for k := range lc.entries {
|
||||
if !inRange(k, key, end) {
|
||||
continue
|
||||
}
|
||||
if wc, _ := lc.Lock(k); wc != nil {
|
||||
ret = append(ret, wc)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (lc *leaseCache) NotifyOps(ops []v3.Op) (wcs []<-chan struct{}) {
|
||||
for _, op := range ops {
|
||||
if op.IsGet() {
|
||||
if _, wc := lc.notify(string(op.KeyBytes())); wc != nil {
|
||||
wcs = append(wcs, wc)
|
||||
}
|
||||
}
|
||||
}
|
||||
return wcs
|
||||
}
|
||||
|
||||
func (lc *leaseCache) MayAcquire(key string) bool {
|
||||
lc.mu.RLock()
|
||||
lr, ok := lc.revokes[key]
|
||||
lc.mu.RUnlock()
|
||||
return !ok || time.Since(lr) > revokeBackoff
|
||||
}
|
||||
|
||||
func (lc *leaseCache) Add(key string, resp *v3.GetResponse, op v3.Op) *v3.GetResponse {
|
||||
lk := &leaseKey{resp, resp.Header.Revision, closedCh}
|
||||
lc.mu.Lock()
|
||||
if lc.header == nil || lc.header.Revision < resp.Header.Revision {
|
||||
lc.header = resp.Header
|
||||
}
|
||||
lc.entries[key] = lk
|
||||
ret := lk.get(op)
|
||||
lc.mu.Unlock()
|
||||
return ret
|
||||
}
|
||||
|
||||
func (lc *leaseCache) Update(key, val []byte, respHeader *v3pb.ResponseHeader) {
|
||||
li := lc.entries[string(key)]
|
||||
if li == nil {
|
||||
return
|
||||
}
|
||||
cacheResp := li.response
|
||||
if len(cacheResp.Kvs) == 0 {
|
||||
kv := &mvccpb.KeyValue{
|
||||
Key: key,
|
||||
CreateRevision: respHeader.Revision,
|
||||
}
|
||||
cacheResp.Kvs = append(cacheResp.Kvs, kv)
|
||||
cacheResp.Count = 1
|
||||
}
|
||||
cacheResp.Kvs[0].Version++
|
||||
if cacheResp.Kvs[0].ModRevision < respHeader.Revision {
|
||||
cacheResp.Header = respHeader
|
||||
cacheResp.Kvs[0].ModRevision = respHeader.Revision
|
||||
cacheResp.Kvs[0].Value = val
|
||||
}
|
||||
}
|
||||
|
||||
func (lc *leaseCache) Delete(key string, hdr *v3pb.ResponseHeader) {
|
||||
lc.mu.Lock()
|
||||
defer lc.mu.Unlock()
|
||||
lc.delete(key, hdr)
|
||||
}
|
||||
|
||||
func (lc *leaseCache) delete(key string, hdr *v3pb.ResponseHeader) {
|
||||
if li := lc.entries[key]; li != nil && hdr.Revision >= li.response.Header.Revision {
|
||||
li.response.Kvs = nil
|
||||
li.response.Header = copyHeader(hdr)
|
||||
}
|
||||
}
|
||||
|
||||
func (lc *leaseCache) Evict(key string) (rev int64) {
|
||||
lc.mu.Lock()
|
||||
defer lc.mu.Unlock()
|
||||
if li := lc.entries[key]; li != nil {
|
||||
rev = li.rev
|
||||
delete(lc.entries, key)
|
||||
lc.revokes[key] = time.Now()
|
||||
}
|
||||
return rev
|
||||
}
|
||||
|
||||
func (lc *leaseCache) EvictRange(key, end string) {
|
||||
lc.mu.Lock()
|
||||
defer lc.mu.Unlock()
|
||||
for k := range lc.entries {
|
||||
if inRange(k, key, end) {
|
||||
delete(lc.entries, key)
|
||||
lc.revokes[key] = time.Now()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isBadOp(op v3.Op) bool { return op.Rev() > 0 || len(op.RangeBytes()) > 0 }
|
||||
|
||||
func (lc *leaseCache) Get(ctx context.Context, op v3.Op) (*v3.GetResponse, bool) {
|
||||
if isBadOp(op) {
|
||||
return nil, false
|
||||
}
|
||||
key := string(op.KeyBytes())
|
||||
li, wc := lc.notify(key)
|
||||
if li == nil {
|
||||
return nil, true
|
||||
}
|
||||
select {
|
||||
case <-wc:
|
||||
case <-ctx.Done():
|
||||
return nil, true
|
||||
}
|
||||
lc.mu.RLock()
|
||||
lk := *li
|
||||
ret := lk.get(op)
|
||||
lc.mu.RUnlock()
|
||||
return ret, true
|
||||
}
|
||||
|
||||
func (lk *leaseKey) get(op v3.Op) *v3.GetResponse {
|
||||
ret := *lk.response
|
||||
ret.Header = copyHeader(ret.Header)
|
||||
empty := len(ret.Kvs) == 0 || op.IsCountOnly()
|
||||
empty = empty || (op.MinModRev() > ret.Kvs[0].ModRevision)
|
||||
empty = empty || (op.MaxModRev() != 0 && op.MaxModRev() < ret.Kvs[0].ModRevision)
|
||||
empty = empty || (op.MinCreateRev() > ret.Kvs[0].CreateRevision)
|
||||
empty = empty || (op.MaxCreateRev() != 0 && op.MaxCreateRev() < ret.Kvs[0].CreateRevision)
|
||||
if empty {
|
||||
ret.Kvs = nil
|
||||
} else {
|
||||
kv := *ret.Kvs[0]
|
||||
kv.Key = make([]byte, len(kv.Key))
|
||||
copy(kv.Key, ret.Kvs[0].Key)
|
||||
if !op.IsKeysOnly() {
|
||||
kv.Value = make([]byte, len(kv.Value))
|
||||
copy(kv.Value, ret.Kvs[0].Value)
|
||||
}
|
||||
ret.Kvs = []*mvccpb.KeyValue{&kv}
|
||||
}
|
||||
return &ret
|
||||
}
|
||||
|
||||
func (lc *leaseCache) notify(key string) (*leaseKey, <-chan struct{}) {
|
||||
lc.mu.RLock()
|
||||
defer lc.mu.RUnlock()
|
||||
if li := lc.entries[key]; li != nil {
|
||||
return li, li.waitc
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (lc *leaseCache) clearOldRevokes(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(time.Second):
|
||||
lc.mu.Lock()
|
||||
for k, lr := range lc.revokes {
|
||||
if time.Now().Sub(lr.Add(revokeBackoff)) > 0 {
|
||||
delete(lc.revokes, k)
|
||||
}
|
||||
}
|
||||
lc.mu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lc *leaseCache) evalCmp(cmps []v3.Cmp) (cmpVal bool, ok bool) {
|
||||
for _, cmp := range cmps {
|
||||
if len(cmp.RangeEnd) > 0 {
|
||||
return false, false
|
||||
}
|
||||
lk := lc.entries[string(cmp.Key)]
|
||||
if lk == nil {
|
||||
return false, false
|
||||
}
|
||||
if !evalCmp(lk.response, cmp) {
|
||||
return false, true
|
||||
}
|
||||
}
|
||||
return true, true
|
||||
}
|
||||
|
||||
func (lc *leaseCache) evalOps(ops []v3.Op) ([]*v3pb.ResponseOp, bool) {
|
||||
resps := make([]*v3pb.ResponseOp, len(ops))
|
||||
for i, op := range ops {
|
||||
if !op.IsGet() || isBadOp(op) {
|
||||
// TODO: support read-only txns
|
||||
return nil, false
|
||||
}
|
||||
lk := lc.entries[string(op.KeyBytes())]
|
||||
if lk == nil {
|
||||
return nil, false
|
||||
}
|
||||
resp := lk.get(op)
|
||||
if resp == nil {
|
||||
return nil, false
|
||||
}
|
||||
resps[i] = &v3pb.ResponseOp{
|
||||
Response: &v3pb.ResponseOp_ResponseRange{
|
||||
(*v3pb.RangeResponse)(resp),
|
||||
},
|
||||
}
|
||||
}
|
||||
return resps, true
|
||||
}
|
||||
45
clientv3/leasing/doc.go
Normal file
45
clientv3/leasing/doc.go
Normal file
@@ -0,0 +1,45 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package leasing is a clientv3 wrapper that provides the client exclusive write access to a key by acquiring a lease and be lineraizably
|
||||
// served locally. This leasing layer can either directly wrap the etcd client or
|
||||
// it can be exposed through the etcd grace proxy server, granting multiple clients write access.
|
||||
//
|
||||
// First, create a leasing client interface:
|
||||
//
|
||||
// leasingCli,error = leasing.NewKV(cli.KV, "leasing-prefix")
|
||||
// if error != nil {
|
||||
// //handle error
|
||||
// }
|
||||
//
|
||||
// The first range request acquires the lease by adding the leasing key ("leasing-prefix"/key) on the server and stores the key locally.
|
||||
// Further linearized read requests using 'cli.leasing' will be served locally as long as the lease exists:
|
||||
// cli.Put(context.TODO(), "abc", "123")
|
||||
//
|
||||
// Lease Acquisition:
|
||||
// leasingCli.Get(context.TODO(), "abc")
|
||||
//
|
||||
// Local reads:
|
||||
// resp,_ := leasingCli.Get(context.TODO(), "abc")
|
||||
// fmt.Printf("%s\n", resp.Kvs[0].Value)
|
||||
// //Output: 123 (served locally)
|
||||
//
|
||||
// Lease Revocation:
|
||||
// If a client writes to the key owned by the leasing client,then the leasing client gives up its lease allowing the client to modify the key.
|
||||
// cli.Put(context.TODO(), "abc", "456")
|
||||
// resp, _ = leasingCli.Get("abc")
|
||||
// fmt.Printf("%s\n", resp.Kvs[0].Value)
|
||||
// // Output: 456 (fetched from server)
|
||||
//
|
||||
package leasing
|
||||
431
clientv3/leasing/kv.go
Normal file
431
clientv3/leasing/kv.go
Normal file
@@ -0,0 +1,431 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package leasing
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
v3 "github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/clientv3/concurrency"
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type leasingKV struct {
|
||||
cl *v3.Client
|
||||
kv v3.KV
|
||||
pfx string
|
||||
leases leaseCache
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
sessionOpts []concurrency.SessionOption
|
||||
session *concurrency.Session
|
||||
sessionc chan struct{}
|
||||
}
|
||||
|
||||
var closedCh chan struct{}
|
||||
|
||||
func init() {
|
||||
closedCh = make(chan struct{})
|
||||
close(closedCh)
|
||||
}
|
||||
|
||||
// NewKV wraps a KV instance so that all requests are wired through a leasing protocol.
|
||||
func NewKV(cl *v3.Client, pfx string, opts ...concurrency.SessionOption) (v3.KV, error) {
|
||||
cctx, cancel := context.WithCancel(cl.Ctx())
|
||||
lkv := leasingKV{
|
||||
cl: cl,
|
||||
kv: cl.KV,
|
||||
pfx: pfx,
|
||||
leases: leaseCache{revokes: make(map[string]time.Time)},
|
||||
ctx: cctx,
|
||||
cancel: cancel,
|
||||
sessionOpts: opts,
|
||||
sessionc: make(chan struct{}),
|
||||
}
|
||||
go lkv.monitorSession()
|
||||
go lkv.leases.clearOldRevokes(cctx)
|
||||
return &lkv, lkv.waitSession(cctx)
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) Get(ctx context.Context, key string, opts ...v3.OpOption) (*v3.GetResponse, error) {
|
||||
return lkv.get(ctx, v3.OpGet(key, opts...))
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) Put(ctx context.Context, key, val string, opts ...v3.OpOption) (*v3.PutResponse, error) {
|
||||
return lkv.put(ctx, v3.OpPut(key, val, opts...))
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) Delete(ctx context.Context, key string, opts ...v3.OpOption) (*v3.DeleteResponse, error) {
|
||||
return lkv.delete(ctx, v3.OpDelete(key, opts...))
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) Do(ctx context.Context, op v3.Op) (v3.OpResponse, error) {
|
||||
switch {
|
||||
case op.IsGet():
|
||||
resp, err := lkv.get(ctx, op)
|
||||
return resp.OpResponse(), err
|
||||
case op.IsPut():
|
||||
resp, err := lkv.put(ctx, op)
|
||||
return resp.OpResponse(), err
|
||||
case op.IsDelete():
|
||||
resp, err := lkv.delete(ctx, op)
|
||||
return resp.OpResponse(), err
|
||||
case op.IsTxn():
|
||||
cmps, thenOps, elseOps := op.Txn()
|
||||
resp, err := lkv.Txn(ctx).If(cmps...).Then(thenOps...).Else(elseOps...).Commit()
|
||||
return resp.OpResponse(), err
|
||||
}
|
||||
return v3.OpResponse{}, nil
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) Compact(ctx context.Context, rev int64, opts ...v3.CompactOption) (*v3.CompactResponse, error) {
|
||||
return lkv.kv.Compact(ctx, rev, opts...)
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) Txn(ctx context.Context) v3.Txn {
|
||||
return &txnLeasing{Txn: lkv.kv.Txn(ctx), lkv: lkv, ctx: ctx}
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) monitorSession() {
|
||||
for lkv.ctx.Err() == nil {
|
||||
if lkv.session != nil {
|
||||
select {
|
||||
case <-lkv.session.Done():
|
||||
case <-lkv.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
lkv.leases.mu.Lock()
|
||||
select {
|
||||
case <-lkv.sessionc:
|
||||
lkv.sessionc = make(chan struct{})
|
||||
default:
|
||||
}
|
||||
lkv.leases.entries = make(map[string]*leaseKey)
|
||||
lkv.leases.mu.Unlock()
|
||||
|
||||
s, err := concurrency.NewSession(lkv.cl, lkv.sessionOpts...)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
lkv.leases.mu.Lock()
|
||||
lkv.session = s
|
||||
close(lkv.sessionc)
|
||||
lkv.leases.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) monitorLease(ctx context.Context, key string, rev int64) {
|
||||
cctx, cancel := context.WithCancel(lkv.ctx)
|
||||
defer cancel()
|
||||
for cctx.Err() == nil {
|
||||
if rev == 0 {
|
||||
resp, err := lkv.kv.Get(ctx, lkv.pfx+key)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
rev = resp.Header.Revision
|
||||
if len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) == "REVOKE" {
|
||||
lkv.rescind(cctx, key, rev)
|
||||
return
|
||||
}
|
||||
}
|
||||
wch := lkv.cl.Watch(cctx, lkv.pfx+key, v3.WithRev(rev+1))
|
||||
for resp := range wch {
|
||||
for _, ev := range resp.Events {
|
||||
if string(ev.Kv.Value) != "REVOKE" {
|
||||
continue
|
||||
}
|
||||
if v3.LeaseID(ev.Kv.Lease) == lkv.leaseID() {
|
||||
lkv.rescind(cctx, key, ev.Kv.ModRevision)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
rev = 0
|
||||
}
|
||||
}
|
||||
|
||||
// rescind releases a lease from this client.
|
||||
func (lkv *leasingKV) rescind(ctx context.Context, key string, rev int64) {
|
||||
if lkv.leases.Evict(key) > rev {
|
||||
return
|
||||
}
|
||||
cmp := v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev)
|
||||
op := v3.OpDelete(lkv.pfx + key)
|
||||
for ctx.Err() == nil {
|
||||
if _, err := lkv.kv.Txn(ctx).If(cmp).Then(op).Commit(); err == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) waitRescind(ctx context.Context, key string, rev int64) error {
|
||||
cctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
wch := lkv.cl.Watch(cctx, lkv.pfx+key, v3.WithRev(rev+1))
|
||||
for resp := range wch {
|
||||
for _, ev := range resp.Events {
|
||||
if ev.Type == v3.EventTypeDelete {
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) tryModifyOp(ctx context.Context, op v3.Op) (*v3.TxnResponse, chan<- struct{}, error) {
|
||||
key := string(op.KeyBytes())
|
||||
wc, rev := lkv.leases.Lock(key)
|
||||
cmp := v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev+1)
|
||||
resp, err := lkv.kv.Txn(ctx).If(cmp).Then(op).Commit()
|
||||
switch {
|
||||
case err != nil:
|
||||
lkv.leases.Evict(key)
|
||||
fallthrough
|
||||
case !resp.Succeeded:
|
||||
if wc != nil {
|
||||
close(wc)
|
||||
}
|
||||
return nil, nil, err
|
||||
}
|
||||
return resp, wc, nil
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) put(ctx context.Context, op v3.Op) (pr *v3.PutResponse, err error) {
|
||||
if err := lkv.waitSession(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for ctx.Err() == nil {
|
||||
resp, wc, err := lkv.tryModifyOp(ctx, op)
|
||||
if err != nil || wc == nil {
|
||||
resp, err = lkv.revoke(ctx, string(op.KeyBytes()), op)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp.Succeeded {
|
||||
lkv.leases.mu.Lock()
|
||||
lkv.leases.Update(op.KeyBytes(), op.ValueBytes(), resp.Header)
|
||||
lkv.leases.mu.Unlock()
|
||||
pr = (*v3.PutResponse)(resp.Responses[0].GetResponsePut())
|
||||
pr.Header = resp.Header
|
||||
}
|
||||
if wc != nil {
|
||||
close(wc)
|
||||
}
|
||||
if resp.Succeeded {
|
||||
return pr, nil
|
||||
}
|
||||
}
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) acquire(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) {
|
||||
if err := lkv.waitSession(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return lkv.kv.Txn(ctx).If(
|
||||
v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0)).
|
||||
Then(
|
||||
op,
|
||||
v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))).
|
||||
Else(op).
|
||||
Commit()
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error) {
|
||||
do := func() (*v3.GetResponse, error) {
|
||||
r, err := lkv.kv.Do(ctx, op)
|
||||
return r.Get(), err
|
||||
}
|
||||
if !lkv.readySession() {
|
||||
return do()
|
||||
}
|
||||
|
||||
if resp, ok := lkv.leases.Get(ctx, op); resp != nil {
|
||||
return resp, nil
|
||||
} else if !ok || op.IsSerializable() {
|
||||
// must be handled by server or can skip linearization
|
||||
return do()
|
||||
}
|
||||
|
||||
key := string(op.KeyBytes())
|
||||
if !lkv.leases.MayAcquire(key) {
|
||||
resp, err := lkv.kv.Do(ctx, op)
|
||||
return resp.Get(), err
|
||||
}
|
||||
|
||||
resp, err := lkv.acquire(ctx, key, v3.OpGet(key))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
getResp := (*v3.GetResponse)(resp.Responses[0].GetResponseRange())
|
||||
getResp.Header = resp.Header
|
||||
if resp.Succeeded {
|
||||
getResp = lkv.leases.Add(key, getResp, op)
|
||||
go lkv.monitorLease(ctx, key, resp.Header.Revision)
|
||||
}
|
||||
return getResp, nil
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) deleteRangeRPC(ctx context.Context, maxLeaseRev int64, key, end string) (*v3.DeleteResponse, error) {
|
||||
lkey, lend := lkv.pfx+key, lkv.pfx+end
|
||||
resp, err := lkv.kv.Txn(ctx).If(
|
||||
v3.Compare(v3.CreateRevision(lkey).WithRange(lend), "<", maxLeaseRev+1),
|
||||
).Then(
|
||||
v3.OpGet(key, v3.WithRange(end), v3.WithKeysOnly()),
|
||||
v3.OpDelete(key, v3.WithRange(end)),
|
||||
).Commit()
|
||||
if err != nil {
|
||||
lkv.leases.EvictRange(key, end)
|
||||
return nil, err
|
||||
}
|
||||
if !resp.Succeeded {
|
||||
return nil, nil
|
||||
}
|
||||
for _, kv := range resp.Responses[0].GetResponseRange().Kvs {
|
||||
lkv.leases.Delete(string(kv.Key), resp.Header)
|
||||
}
|
||||
delResp := (*v3.DeleteResponse)(resp.Responses[1].GetResponseDeleteRange())
|
||||
delResp.Header = resp.Header
|
||||
return delResp, nil
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) deleteRange(ctx context.Context, op v3.Op) (*v3.DeleteResponse, error) {
|
||||
key, end := string(op.KeyBytes()), string(op.RangeBytes())
|
||||
for ctx.Err() == nil {
|
||||
maxLeaseRev, err := lkv.revokeRange(ctx, key, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wcs := lkv.leases.LockRange(key, end)
|
||||
delResp, err := lkv.deleteRangeRPC(ctx, maxLeaseRev, key, end)
|
||||
closeAll(wcs)
|
||||
if err != nil || delResp != nil {
|
||||
return delResp, err
|
||||
}
|
||||
}
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) delete(ctx context.Context, op v3.Op) (dr *v3.DeleteResponse, err error) {
|
||||
if err := lkv.waitSession(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(op.RangeBytes()) > 0 {
|
||||
return lkv.deleteRange(ctx, op)
|
||||
}
|
||||
key := string(op.KeyBytes())
|
||||
for ctx.Err() == nil {
|
||||
resp, wc, err := lkv.tryModifyOp(ctx, op)
|
||||
if err != nil || wc == nil {
|
||||
resp, err = lkv.revoke(ctx, key, op)
|
||||
}
|
||||
if err != nil {
|
||||
// don't know if delete was processed
|
||||
lkv.leases.Evict(key)
|
||||
return nil, err
|
||||
}
|
||||
if resp.Succeeded {
|
||||
dr = (*v3.DeleteResponse)(resp.Responses[0].GetResponseDeleteRange())
|
||||
dr.Header = resp.Header
|
||||
lkv.leases.Delete(key, dr.Header)
|
||||
}
|
||||
if wc != nil {
|
||||
close(wc)
|
||||
}
|
||||
if resp.Succeeded {
|
||||
return dr, nil
|
||||
}
|
||||
}
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) revoke(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) {
|
||||
rev := lkv.leases.Rev(key)
|
||||
txn := lkv.kv.Txn(ctx).If(v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev+1)).Then(op)
|
||||
resp, err := txn.Else(v3.OpPut(lkv.pfx+key, "REVOKE", v3.WithIgnoreLease())).Commit()
|
||||
if err != nil || resp.Succeeded {
|
||||
return resp, err
|
||||
}
|
||||
return resp, lkv.waitRescind(ctx, key, resp.Header.Revision)
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) revokeRange(ctx context.Context, begin, end string) (int64, error) {
|
||||
lkey, lend := lkv.pfx+begin, ""
|
||||
if len(end) > 0 {
|
||||
lend = lkv.pfx + end
|
||||
}
|
||||
leaseKeys, err := lkv.kv.Get(ctx, lkey, v3.WithRange(lend))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return lkv.revokeLeaseKvs(ctx, leaseKeys.Kvs)
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) revokeLeaseKvs(ctx context.Context, kvs []*mvccpb.KeyValue) (int64, error) {
|
||||
maxLeaseRev := int64(0)
|
||||
for _, kv := range kvs {
|
||||
if rev := kv.CreateRevision; rev > maxLeaseRev {
|
||||
maxLeaseRev = rev
|
||||
}
|
||||
if v3.LeaseID(kv.Lease) == lkv.leaseID() {
|
||||
// don't revoke own keys
|
||||
continue
|
||||
}
|
||||
key := strings.TrimPrefix(string(kv.Key), lkv.pfx)
|
||||
if _, err := lkv.revoke(ctx, key, v3.OpGet(key)); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
return maxLeaseRev, nil
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) waitSession(ctx context.Context) error {
|
||||
select {
|
||||
case <-lkv.sessionc:
|
||||
return nil
|
||||
case <-lkv.ctx.Done():
|
||||
return lkv.ctx.Err()
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) readySession() bool {
|
||||
lkv.leases.mu.RLock()
|
||||
defer lkv.leases.mu.RUnlock()
|
||||
if lkv.session == nil {
|
||||
return false
|
||||
}
|
||||
select {
|
||||
case <-lkv.session.Done():
|
||||
default:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) leaseID() v3.LeaseID {
|
||||
lkv.leases.mu.RLock()
|
||||
defer lkv.leases.mu.RUnlock()
|
||||
return lkv.session.Lease()
|
||||
}
|
||||
223
clientv3/leasing/txn.go
Normal file
223
clientv3/leasing/txn.go
Normal file
@@ -0,0 +1,223 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package leasing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
v3 "github.com/coreos/etcd/clientv3"
|
||||
v3pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
)
|
||||
|
||||
type txnLeasing struct {
|
||||
v3.Txn
|
||||
lkv *leasingKV
|
||||
ctx context.Context
|
||||
cs []v3.Cmp
|
||||
opst []v3.Op
|
||||
opse []v3.Op
|
||||
}
|
||||
|
||||
func (txn *txnLeasing) If(cs ...v3.Cmp) v3.Txn {
|
||||
txn.cs = append(txn.cs, cs...)
|
||||
txn.Txn = txn.Txn.If(cs...)
|
||||
return txn
|
||||
}
|
||||
|
||||
func (txn *txnLeasing) Then(ops ...v3.Op) v3.Txn {
|
||||
txn.opst = append(txn.opst, ops...)
|
||||
txn.Txn = txn.Txn.Then(ops...)
|
||||
return txn
|
||||
}
|
||||
|
||||
func (txn *txnLeasing) Else(ops ...v3.Op) v3.Txn {
|
||||
txn.opse = append(txn.opse, ops...)
|
||||
txn.Txn = txn.Txn.Else(ops...)
|
||||
return txn
|
||||
}
|
||||
|
||||
func (txn *txnLeasing) Commit() (*v3.TxnResponse, error) {
|
||||
if resp, err := txn.eval(); resp != nil || err != nil {
|
||||
return resp, err
|
||||
}
|
||||
return txn.serverTxn()
|
||||
}
|
||||
|
||||
func (txn *txnLeasing) eval() (*v3.TxnResponse, error) {
|
||||
// TODO: wait on keys in comparisons
|
||||
thenOps, elseOps := gatherOps(txn.opst), gatherOps(txn.opse)
|
||||
ops := make([]v3.Op, 0, len(thenOps)+len(elseOps))
|
||||
ops = append(ops, thenOps...)
|
||||
ops = append(ops, elseOps...)
|
||||
|
||||
for _, ch := range txn.lkv.leases.NotifyOps(ops) {
|
||||
select {
|
||||
case <-ch:
|
||||
case <-txn.ctx.Done():
|
||||
return nil, txn.ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
txn.lkv.leases.mu.RLock()
|
||||
defer txn.lkv.leases.mu.RUnlock()
|
||||
succeeded, ok := txn.lkv.leases.evalCmp(txn.cs)
|
||||
if !ok || txn.lkv.leases.header == nil {
|
||||
return nil, nil
|
||||
}
|
||||
if ops = txn.opst; !succeeded {
|
||||
ops = txn.opse
|
||||
}
|
||||
|
||||
resps, ok := txn.lkv.leases.evalOps(ops)
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
return &v3.TxnResponse{copyHeader(txn.lkv.leases.header), succeeded, resps}, nil
|
||||
}
|
||||
|
||||
// fallback computes the ops to fetch all possible conflicting
|
||||
// leasing keys for a list of ops.
|
||||
func (txn *txnLeasing) fallback(ops []v3.Op) (fbOps []v3.Op) {
|
||||
for _, op := range ops {
|
||||
if op.IsGet() {
|
||||
continue
|
||||
}
|
||||
lkey, lend := txn.lkv.pfx+string(op.KeyBytes()), ""
|
||||
if len(op.RangeBytes()) > 0 {
|
||||
lend = txn.lkv.pfx + string(op.RangeBytes())
|
||||
}
|
||||
fbOps = append(fbOps, v3.OpGet(lkey, v3.WithRange(lend)))
|
||||
}
|
||||
return fbOps
|
||||
}
|
||||
|
||||
func (txn *txnLeasing) guardKeys(ops []v3.Op) (cmps []v3.Cmp) {
|
||||
seen := make(map[string]bool)
|
||||
for _, op := range ops {
|
||||
key := string(op.KeyBytes())
|
||||
if op.IsGet() || len(op.RangeBytes()) != 0 || seen[key] {
|
||||
continue
|
||||
}
|
||||
rev := txn.lkv.leases.Rev(key)
|
||||
cmps = append(cmps, v3.Compare(v3.CreateRevision(txn.lkv.pfx+key), "<", rev+1))
|
||||
seen[key] = true
|
||||
}
|
||||
return cmps
|
||||
}
|
||||
|
||||
func (txn *txnLeasing) guardRanges(ops []v3.Op) (cmps []v3.Cmp, err error) {
|
||||
for _, op := range ops {
|
||||
if op.IsGet() || len(op.RangeBytes()) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
key, end := string(op.KeyBytes()), string(op.RangeBytes())
|
||||
maxRevLK, err := txn.lkv.revokeRange(txn.ctx, key, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts := append(v3.WithLastRev(), v3.WithRange(end))
|
||||
getResp, err := txn.lkv.kv.Get(txn.ctx, key, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
maxModRev := int64(0)
|
||||
if len(getResp.Kvs) > 0 {
|
||||
maxModRev = getResp.Kvs[0].ModRevision
|
||||
}
|
||||
|
||||
noKeyUpdate := v3.Compare(v3.ModRevision(key).WithRange(end), "<", maxModRev+1)
|
||||
noLeaseUpdate := v3.Compare(
|
||||
v3.CreateRevision(txn.lkv.pfx+key).WithRange(txn.lkv.pfx+end),
|
||||
"<",
|
||||
maxRevLK+1)
|
||||
cmps = append(cmps, noKeyUpdate, noLeaseUpdate)
|
||||
}
|
||||
return cmps, nil
|
||||
}
|
||||
|
||||
func (txn *txnLeasing) guard(ops []v3.Op) ([]v3.Cmp, error) {
|
||||
cmps := txn.guardKeys(ops)
|
||||
rangeCmps, err := txn.guardRanges(ops)
|
||||
return append(cmps, rangeCmps...), err
|
||||
}
|
||||
|
||||
func (txn *txnLeasing) commitToCache(txnResp *v3pb.TxnResponse, userTxn v3.Op) {
|
||||
ops := gatherResponseOps(txnResp.Responses, []v3.Op{userTxn})
|
||||
txn.lkv.leases.mu.Lock()
|
||||
for _, op := range ops {
|
||||
key := string(op.KeyBytes())
|
||||
if op.IsDelete() && len(op.RangeBytes()) > 0 {
|
||||
end := string(op.RangeBytes())
|
||||
for k := range txn.lkv.leases.entries {
|
||||
if inRange(k, key, end) {
|
||||
txn.lkv.leases.delete(k, txnResp.Header)
|
||||
}
|
||||
}
|
||||
} else if op.IsDelete() {
|
||||
txn.lkv.leases.delete(key, txnResp.Header)
|
||||
}
|
||||
if op.IsPut() {
|
||||
txn.lkv.leases.Update(op.KeyBytes(), op.ValueBytes(), txnResp.Header)
|
||||
}
|
||||
}
|
||||
txn.lkv.leases.mu.Unlock()
|
||||
}
|
||||
|
||||
func (txn *txnLeasing) revokeFallback(fbResps []*v3pb.ResponseOp) error {
|
||||
for _, resp := range fbResps {
|
||||
_, err := txn.lkv.revokeLeaseKvs(txn.ctx, resp.GetResponseRange().Kvs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *txnLeasing) serverTxn() (*v3.TxnResponse, error) {
|
||||
if err := txn.lkv.waitSession(txn.ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
userOps := gatherOps(append(txn.opst, txn.opse...))
|
||||
userTxn := v3.OpTxn(txn.cs, txn.opst, txn.opse)
|
||||
fbOps := txn.fallback(userOps)
|
||||
|
||||
defer closeAll(txn.lkv.leases.LockWriteOps(userOps))
|
||||
for {
|
||||
cmps, err := txn.guard(userOps)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := txn.lkv.kv.Txn(txn.ctx).If(cmps...).Then(userTxn).Else(fbOps...).Commit()
|
||||
if err != nil {
|
||||
for _, cmp := range cmps {
|
||||
txn.lkv.leases.Evict(strings.TrimPrefix(string(cmp.Key), txn.lkv.pfx))
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if resp.Succeeded {
|
||||
txn.commitToCache((*v3pb.TxnResponse)(resp), userTxn)
|
||||
userResp := resp.Responses[0].GetResponseTxn()
|
||||
userResp.Header = resp.Header
|
||||
return (*v3.TxnResponse)(userResp), nil
|
||||
}
|
||||
if err := txn.revokeFallback(resp.Responses); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
108
clientv3/leasing/util.go
Normal file
108
clientv3/leasing/util.go
Normal file
@@ -0,0 +1,108 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package leasing
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
v3 "github.com/coreos/etcd/clientv3"
|
||||
v3pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
)
|
||||
|
||||
func compareInt64(a, b int64) int {
|
||||
switch {
|
||||
case a < b:
|
||||
return -1
|
||||
case a > b:
|
||||
return 1
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
func evalCmp(resp *v3.GetResponse, tcmp v3.Cmp) bool {
|
||||
var result int
|
||||
if len(resp.Kvs) != 0 {
|
||||
kv := resp.Kvs[0]
|
||||
switch tcmp.Target {
|
||||
case v3pb.Compare_VALUE:
|
||||
if tv, _ := tcmp.TargetUnion.(*v3pb.Compare_Value); tv != nil {
|
||||
result = bytes.Compare(kv.Value, tv.Value)
|
||||
}
|
||||
case v3pb.Compare_CREATE:
|
||||
if tv, _ := tcmp.TargetUnion.(*v3pb.Compare_CreateRevision); tv != nil {
|
||||
result = compareInt64(kv.CreateRevision, tv.CreateRevision)
|
||||
}
|
||||
case v3pb.Compare_MOD:
|
||||
if tv, _ := tcmp.TargetUnion.(*v3pb.Compare_ModRevision); tv != nil {
|
||||
result = compareInt64(kv.ModRevision, tv.ModRevision)
|
||||
}
|
||||
case v3pb.Compare_VERSION:
|
||||
if tv, _ := tcmp.TargetUnion.(*v3pb.Compare_Version); tv != nil {
|
||||
result = compareInt64(kv.Version, tv.Version)
|
||||
}
|
||||
}
|
||||
}
|
||||
switch tcmp.Result {
|
||||
case v3pb.Compare_EQUAL:
|
||||
return result == 0
|
||||
case v3pb.Compare_NOT_EQUAL:
|
||||
return result != 0
|
||||
case v3pb.Compare_GREATER:
|
||||
return result > 0
|
||||
case v3pb.Compare_LESS:
|
||||
return result < 0
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func gatherOps(ops []v3.Op) (ret []v3.Op) {
|
||||
for _, op := range ops {
|
||||
if !op.IsTxn() {
|
||||
ret = append(ret, op)
|
||||
continue
|
||||
}
|
||||
_, thenOps, elseOps := op.Txn()
|
||||
ret = append(ret, gatherOps(append(thenOps, elseOps...))...)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func gatherResponseOps(resp []*v3pb.ResponseOp, ops []v3.Op) (ret []v3.Op) {
|
||||
for i, op := range ops {
|
||||
if !op.IsTxn() {
|
||||
ret = append(ret, op)
|
||||
continue
|
||||
}
|
||||
_, thenOps, elseOps := op.Txn()
|
||||
if txnResp := resp[i].GetResponseTxn(); txnResp.Succeeded {
|
||||
ret = append(ret, gatherResponseOps(txnResp.Responses, thenOps)...)
|
||||
} else {
|
||||
ret = append(ret, gatherResponseOps(txnResp.Responses, elseOps)...)
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func copyHeader(hdr *v3pb.ResponseHeader) *v3pb.ResponseHeader {
|
||||
h := *hdr
|
||||
return &h
|
||||
}
|
||||
|
||||
func closeAll(chs []chan<- struct{}) {
|
||||
for _, ch := range chs {
|
||||
close(ch)
|
||||
}
|
||||
}
|
||||
@@ -89,6 +89,45 @@ func (op *Op) WithKeyBytes(key []byte) { op.key = key }
|
||||
// RangeBytes returns the byte slice holding with the Op's range end, if any.
|
||||
func (op Op) RangeBytes() []byte { return op.end }
|
||||
|
||||
// Rev returns the requested revision, if any.
|
||||
func (op Op) Rev() int64 { return op.rev }
|
||||
|
||||
// IsPut returns true iff the operation is a Put.
|
||||
func (op Op) IsPut() bool { return op.t == tPut }
|
||||
|
||||
// IsGet returns true iff the operation is a Get.
|
||||
func (op Op) IsGet() bool { return op.t == tRange }
|
||||
|
||||
// IsDelete returns true iff the operation is a Delete.
|
||||
func (op Op) IsDelete() bool { return op.t == tDeleteRange }
|
||||
|
||||
// IsSerializable returns true if the serializable field is true.
|
||||
func (op Op) IsSerializable() bool { return op.serializable == true }
|
||||
|
||||
// IsKeysOnly returns true if the keysonly field is true.
|
||||
func (op Op) IsKeysOnly() bool { return op.keysOnly == true }
|
||||
|
||||
// IsCountOnly returns true if the countonly field is true.
|
||||
func (op Op) IsCountOnly() bool { return op.countOnly == true }
|
||||
|
||||
// MinModRev returns if field is populated.
|
||||
func (op Op) MinModRev() int64 { return op.minModRev }
|
||||
|
||||
// MaxModRev returns if field is populated.
|
||||
func (op Op) MaxModRev() int64 { return op.maxModRev }
|
||||
|
||||
// MinCreateRev returns if field is populated.
|
||||
func (op Op) MinCreateRev() int64 { return op.minCreateRev }
|
||||
|
||||
// MaxCreateRev returns if field is populated.
|
||||
func (op Op) MaxCreateRev() int64 { return op.maxCreateRev }
|
||||
|
||||
// Limit returns if field is populated.
|
||||
func (op Op) retLimit() int64 { return op.limit }
|
||||
|
||||
// Sort returns if field is populated.
|
||||
func (op Op) retSort() bool { return op.sort != nil }
|
||||
|
||||
// WithRangeBytes sets the byte slice for the Op's range end.
|
||||
func (op *Op) WithRangeBytes(end []byte) { op.end = end }
|
||||
|
||||
|
||||
@@ -195,7 +195,7 @@ var rangeTests = []struct {
|
||||
|
||||
func TestKvOrdering(t *testing.T) {
|
||||
for i, tt := range rangeTests {
|
||||
mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.ToOpResponse()}
|
||||
mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.OpResponse()}
|
||||
kv := &kvOrdering{
|
||||
mKV,
|
||||
func(r *clientv3.GetResponse) OrderViolationFunc {
|
||||
@@ -249,7 +249,7 @@ var txnTests = []struct {
|
||||
|
||||
func TestTxnOrdering(t *testing.T) {
|
||||
for i, tt := range txnTests {
|
||||
mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.ToOpResponse()}
|
||||
mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.OpResponse()}
|
||||
kv := &kvOrdering{
|
||||
mKV,
|
||||
func(r *clientv3.TxnResponse) OrderViolationFunc {
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/clientv3/leasing"
|
||||
"github.com/coreos/etcd/clientv3/namespace"
|
||||
"github.com/coreos/etcd/clientv3/ordering"
|
||||
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
|
||||
@@ -70,6 +71,7 @@ var (
|
||||
grpcProxyResolverTTL int
|
||||
|
||||
grpcProxyNamespace string
|
||||
grpcProxyLeasing string
|
||||
|
||||
grpcProxyEnablePprof bool
|
||||
grpcProxyEnableOrdering bool
|
||||
@@ -124,7 +126,7 @@ func newGRPCProxyStartCommand() *cobra.Command {
|
||||
|
||||
// experimental flags
|
||||
cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
|
||||
|
||||
cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.")
|
||||
return &cmd
|
||||
}
|
||||
|
||||
@@ -282,6 +284,10 @@ func newGRPCProxyServer(client *clientv3.Client) *grpc.Server {
|
||||
client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
|
||||
}
|
||||
|
||||
if len(grpcProxyLeasing) > 0 {
|
||||
client.KV, _ = leasing.NewKV(client, grpcProxyLeasing)
|
||||
}
|
||||
|
||||
kvp, _ := grpcproxy.NewKvProxy(client)
|
||||
watchp, _ := grpcproxy.NewWatchProxy(client)
|
||||
if grpcProxyResolverPrefix != "" {
|
||||
|
||||
4
test
4
test
@@ -212,14 +212,14 @@ function integration_e2e_pass {
|
||||
wait $e2epid
|
||||
wait $intpid
|
||||
go test -timeout 1m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/client/integration
|
||||
go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
|
||||
go test -timeout 20m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
|
||||
go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample
|
||||
go test -timeout 1m -v ${RACE} -cpu 1,2,4 -run=Example $@ ${TEST}
|
||||
}
|
||||
|
||||
function grpcproxy_pass {
|
||||
go test -timeout 20m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/integration
|
||||
go test -timeout 15m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
|
||||
go test -timeout 20m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
|
||||
go test -timeout 15m -v -tags cluster_proxy $@ ${REPO_PATH}/e2e
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user