grpcproxy: update cache based on txn response

Fixes more hangs in TestSTMConflict.
This commit is contained in:
Anthony Romano 2016-11-08 21:21:44 -08:00
parent dbb692e50f
commit a4dcceb8aa
2 changed files with 24 additions and 2 deletions

View File

@ -134,14 +134,14 @@ func (c *cache) Invalidate(key, endkey []byte) {
} }
ivs = c.cachedRanges.Stab(ivl) ivs = c.cachedRanges.Stab(ivl)
c.cachedRanges.Delete(ivl)
for _, iv := range ivs { for _, iv := range ivs {
keys := iv.Val.([]string) keys := iv.Val.([]string)
for _, key := range keys { for _, key := range keys {
c.lru.Remove(key) c.lru.Remove(key)
} }
} }
// delete after removing all keys since it is destructive to 'ivs'
c.cachedRanges.Delete(ivl)
} }
// Compact invalidate all caching response before the given rev. // Compact invalidate all caching response before the given rev.

View File

@ -70,6 +70,22 @@ func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*p
return (*pb.DeleteRangeResponse)(resp.Del()), err return (*pb.DeleteRangeResponse)(resp.Del()), err
} }
func (p *kvProxy) txnToCache(reqs []*pb.RequestOp, resps []*pb.ResponseOp) {
for i := range resps {
switch tv := resps[i].Response.(type) {
case *pb.ResponseOp_ResponsePut:
p.cache.Invalidate(reqs[i].GetRequestPut().Key, nil)
case *pb.ResponseOp_ResponseDeleteRange:
rdr := reqs[i].GetRequestDeleteRange()
p.cache.Invalidate(rdr.Key, rdr.RangeEnd)
case *pb.ResponseOp_ResponseRange:
req := reqs[i].GetRequestRange()
req.Serializable = true
p.cache.Add(req, tv.ResponseRange)
}
}
}
func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
txn := p.kv.Txn(ctx) txn := p.kv.Txn(ctx)
cmps := make([]clientv3.Cmp, len(r.Compare)) cmps := make([]clientv3.Cmp, len(r.Compare))
@ -97,6 +113,12 @@ func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, e
for _, cmp := range r.Compare { for _, cmp := range r.Compare {
p.cache.Invalidate(cmp.Key, nil) p.cache.Invalidate(cmp.Key, nil)
} }
// update any fetched keys
if resp.Succeeded {
p.txnToCache(r.Success, resp.Responses)
} else {
p.txnToCache(r.Failure, resp.Responses)
}
return (*pb.TxnResponse)(resp), nil return (*pb.TxnResponse)(resp), nil
} }