From 9fa6c95054447c2b8d65ec85be8ae43df9644a4a Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 11 Jan 2017 13:55:44 -0800 Subject: [PATCH] grpcproxy: use ccache for key cache groupcache needs a write lock and has no way to expire keys; ccache can do this, though. Also removes the key count metric, since there's no way to efficiently calculate it using ccache. --- proxy/grpcproxy/cache/store.go | 36 ++++++++++++++++++---------------- proxy/grpcproxy/kv.go | 7 ------- proxy/grpcproxy/metrics.go | 7 ------- 3 files changed, 19 insertions(+), 31 deletions(-) diff --git a/proxy/grpcproxy/cache/store.go b/proxy/grpcproxy/cache/store.go index 7eb6e4273..b348d1803 100644 --- a/proxy/grpcproxy/cache/store.go +++ b/proxy/grpcproxy/cache/store.go @@ -17,11 +17,13 @@ package cache import ( "errors" "sync" + "time" + + "github.com/karlseguin/ccache" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/adt" - "github.com/golang/groupcache/lru" ) var ( @@ -29,12 +31,14 @@ var ( ErrCompacted = rpctypes.ErrGRPCCompacted ) +const defaultHistoricTTL = time.Hour +const defaultCurrentTTL = time.Minute + type Cache interface { Add(req *pb.RangeRequest, resp *pb.RangeResponse) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) Compact(revision int64) Invalidate(key []byte, endkey []byte) - Size() int } // keyFunc returns the key of an request, which is used to look up in the cache for it's caching response. @@ -49,7 +53,7 @@ func keyFunc(req *pb.RangeRequest) string { func NewCache(maxCacheEntries int) Cache { return &cache{ - lru: lru.New(maxCacheEntries), + lru: ccache.New(ccache.Configure().MaxSize(int64(maxCacheEntries))), compactedRev: -1, } } @@ -57,7 +61,7 @@ func NewCache(maxCacheEntries int) Cache { // cache implements Cache type cache struct { mu sync.RWMutex - lru *lru.Cache + lru *ccache.Cache // a reverse index for cache invalidation cachedRanges adt.IntervalTree @@ -73,7 +77,11 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) { defer c.mu.Unlock() if req.Revision > c.compactedRev { - c.lru.Add(key, resp) + if req.Revision == 0 { + c.lru.Set(key, resp, defaultCurrentTTL) + } else { + c.lru.Set(key, resp, defaultHistoricTTL) + } } // we do not need to invalidate a request with a revision specified. // so we do not need to add it into the reverse index. @@ -105,16 +113,16 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) { func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) { key := keyFunc(req) - c.mu.Lock() - defer c.mu.Unlock() + c.mu.RLock() + defer c.mu.RUnlock() if req.Revision < c.compactedRev { - c.lru.Remove(key) + c.lru.Delete(key) return nil, ErrCompacted } - if resp, ok := c.lru.Get(key); ok { - return resp.(*pb.RangeResponse), nil + if item := c.lru.Get(key); item != nil { + return item.Value().(*pb.RangeResponse), nil } return nil, errors.New("not exist") } @@ -138,7 +146,7 @@ func (c *cache) Invalidate(key, endkey []byte) { for _, iv := range ivs { keys := iv.Val.([]string) for _, key := range keys { - c.lru.Remove(key) + c.lru.Delete(key) } } // delete after removing all keys since it is destructive to 'ivs' @@ -155,9 +163,3 @@ func (c *cache) Compact(revision int64) { c.compactedRev = revision } } - -func (c *cache) Size() int { - c.mu.RLock() - defer c.mu.RUnlock() - return c.lru.Len() -} diff --git a/proxy/grpcproxy/kv.go b/proxy/grpcproxy/kv.go index bfb16c4ed..1d6cf452a 100644 --- a/proxy/grpcproxy/kv.go +++ b/proxy/grpcproxy/kv.go @@ -58,14 +58,12 @@ func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRespo req.Serializable = true gresp := (*pb.RangeResponse)(resp.Get()) p.cache.Add(&req, gresp) - cacheKeys.Set(float64(p.cache.Size())) return gresp, nil } func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { p.cache.Invalidate(r.Key, nil) - cacheKeys.Set(float64(p.cache.Size())) resp, err := p.kv.Do(ctx, PutRequestToOp(r)) return (*pb.PutResponse)(resp.Put()), err @@ -73,7 +71,6 @@ func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, e func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { p.cache.Invalidate(r.Key, r.RangeEnd) - cacheKeys.Set(float64(p.cache.Size())) resp, err := p.kv.Do(ctx, DelRequestToOp(r)) return (*pb.DeleteRangeResponse)(resp.Del()), err @@ -129,8 +126,6 @@ func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, e p.txnToCache(r.Failure, resp.Responses) } - cacheKeys.Set(float64(p.cache.Size())) - return (*pb.TxnResponse)(resp), nil } @@ -145,8 +140,6 @@ func (p *kvProxy) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.Com p.cache.Compact(r.Revision) } - cacheKeys.Set(float64(p.cache.Size())) - return (*pb.CompactionResponse)(resp), err } diff --git a/proxy/grpcproxy/metrics.go b/proxy/grpcproxy/metrics.go index 864fa1609..f4a1d4c8d 100644 --- a/proxy/grpcproxy/metrics.go +++ b/proxy/grpcproxy/metrics.go @@ -29,12 +29,6 @@ var ( Name: "events_coalescing_total", Help: "Total number of events coalescing", }) - cacheKeys = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "etcd", - Subsystem: "grpc_proxy", - Name: "cache_keys_total", - Help: "Total number of keys/ranges cached", - }) cacheHits = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "etcd", Subsystem: "grpc_proxy", @@ -52,7 +46,6 @@ var ( func init() { prometheus.MustRegister(watchersCoalescing) prometheus.MustRegister(eventsCoalescing) - prometheus.MustRegister(cacheKeys) prometheus.MustRegister(cacheHits) prometheus.MustRegister(cachedMisses) }