From 1fe2a9b124b06b6af84521ec0ccd7827b86c6821 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 10 Feb 2017 14:27:48 -0800 Subject: [PATCH] Revert "Merge pull request #7139 from heyitsanthony/proxy-rlock" This reverts commit 304606ab0bb009169b442dec6f9af76e4cd1e887, reversing changes made to 7dfe503f1c86de755ca379e95922e43afacee923. --- .../github.com/golang/groupcache/LICENSE | 191 +++++++++++++++ .../github.com/golang/groupcache/lru/lru.go | 121 ++++++++++ .../github.com/karlseguin/ccache/bucket.go | 41 ---- .../github.com/karlseguin/ccache/cache.go | 218 ----------------- .../karlseguin/ccache/configuration.go | 94 -------- .../github.com/karlseguin/ccache/item.go | 103 -------- .../karlseguin/ccache/layeredbucket.go | 82 ------- .../karlseguin/ccache/layeredcache.go | 222 ------------------ .../github.com/karlseguin/ccache/license.txt | 19 -- .../karlseguin/ccache/secondarycache.go | 72 ------ glide.lock | 6 +- glide.yaml | 6 +- proxy/grpcproxy/cache/store.go | 38 ++- proxy/grpcproxy/kv.go | 7 + proxy/grpcproxy/metrics.go | 7 + 15 files changed, 352 insertions(+), 875 deletions(-) create mode 100644 cmd/vendor/github.com/golang/groupcache/LICENSE create mode 100644 cmd/vendor/github.com/golang/groupcache/lru/lru.go delete mode 100644 cmd/vendor/github.com/karlseguin/ccache/bucket.go delete mode 100644 cmd/vendor/github.com/karlseguin/ccache/cache.go delete mode 100644 cmd/vendor/github.com/karlseguin/ccache/configuration.go delete mode 100644 cmd/vendor/github.com/karlseguin/ccache/item.go delete mode 100644 cmd/vendor/github.com/karlseguin/ccache/layeredbucket.go delete mode 100644 cmd/vendor/github.com/karlseguin/ccache/layeredcache.go delete mode 100644 cmd/vendor/github.com/karlseguin/ccache/license.txt delete mode 100644 cmd/vendor/github.com/karlseguin/ccache/secondarycache.go diff --git a/cmd/vendor/github.com/golang/groupcache/LICENSE b/cmd/vendor/github.com/golang/groupcache/LICENSE new file mode 100644 index 000000000..37ec93a14 --- /dev/null +++ b/cmd/vendor/github.com/golang/groupcache/LICENSE @@ -0,0 +1,191 @@ +Apache License +Version 2.0, January 2004 +http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + +"License" shall mean the terms and conditions for use, reproduction, and +distribution as defined by Sections 1 through 9 of this document. + +"Licensor" shall mean the copyright owner or entity authorized by the copyright +owner that is granting the License. + +"Legal Entity" shall mean the union of the acting entity and all other entities +that control, are controlled by, or are under common control with that entity. +For the purposes of this definition, "control" means (i) the power, direct or +indirect, to cause the direction or management of such entity, whether by +contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the +outstanding shares, or (iii) beneficial ownership of such entity. + +"You" (or "Your") shall mean an individual or Legal Entity exercising +permissions granted by this License. + +"Source" form shall mean the preferred form for making modifications, including +but not limited to software source code, documentation source, and configuration +files. + +"Object" form shall mean any form resulting from mechanical transformation or +translation of a Source form, including but not limited to compiled object code, +generated documentation, and conversions to other media types. + +"Work" shall mean the work of authorship, whether in Source or Object form, made +available under the License, as indicated by a copyright notice that is included +in or attached to the work (an example is provided in the Appendix below). + +"Derivative Works" shall mean any work, whether in Source or Object form, that +is based on (or derived from) the Work and for which the editorial revisions, +annotations, elaborations, or other modifications represent, as a whole, an +original work of authorship. For the purposes of this License, Derivative Works +shall not include works that remain separable from, or merely link (or bind by +name) to the interfaces of, the Work and Derivative Works thereof. + +"Contribution" shall mean any work of authorship, including the original version +of the Work and any modifications or additions to that Work or Derivative Works +thereof, that is intentionally submitted to Licensor for inclusion in the Work +by the copyright owner or by an individual or Legal Entity authorized to submit +on behalf of the copyright owner. For the purposes of this definition, +"submitted" means any form of electronic, verbal, or written communication sent +to the Licensor or its representatives, including but not limited to +communication on electronic mailing lists, source code control systems, and +issue tracking systems that are managed by, or on behalf of, the Licensor for +the purpose of discussing and improving the Work, but excluding communication +that is conspicuously marked or otherwise designated in writing by the copyright +owner as "Not a Contribution." + +"Contributor" shall mean Licensor and any individual or Legal Entity on behalf +of whom a Contribution has been received by Licensor and subsequently +incorporated within the Work. + +2. Grant of Copyright License. + +Subject to the terms and conditions of this License, each Contributor hereby +grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, +irrevocable copyright license to reproduce, prepare Derivative Works of, +publicly display, publicly perform, sublicense, and distribute the Work and such +Derivative Works in Source or Object form. + +3. Grant of Patent License. + +Subject to the terms and conditions of this License, each Contributor hereby +grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, +irrevocable (except as stated in this section) patent license to make, have +made, use, offer to sell, sell, import, and otherwise transfer the Work, where +such license applies only to those patent claims licensable by such Contributor +that are necessarily infringed by their Contribution(s) alone or by combination +of their Contribution(s) with the Work to which such Contribution(s) was +submitted. If You institute patent litigation against any entity (including a +cross-claim or counterclaim in a lawsuit) alleging that the Work or a +Contribution incorporated within the Work constitutes direct or contributory +patent infringement, then any patent licenses granted to You under this License +for that Work shall terminate as of the date such litigation is filed. + +4. Redistribution. + +You may reproduce and distribute copies of the Work or Derivative Works thereof +in any medium, with or without modifications, and in Source or Object form, +provided that You meet the following conditions: + +You must give any other recipients of the Work or Derivative Works a copy of +this License; and +You must cause any modified files to carry prominent notices stating that You +changed the files; and +You must retain, in the Source form of any Derivative Works that You distribute, +all copyright, patent, trademark, and attribution notices from the Source form +of the Work, excluding those notices that do not pertain to any part of the +Derivative Works; and +If the Work includes a "NOTICE" text file as part of its distribution, then any +Derivative Works that You distribute must include a readable copy of the +attribution notices contained within such NOTICE file, excluding those notices +that do not pertain to any part of the Derivative Works, in at least one of the +following places: within a NOTICE text file distributed as part of the +Derivative Works; within the Source form or documentation, if provided along +with the Derivative Works; or, within a display generated by the Derivative +Works, if and wherever such third-party notices normally appear. The contents of +the NOTICE file are for informational purposes only and do not modify the +License. You may add Your own attribution notices within Derivative Works that +You distribute, alongside or as an addendum to the NOTICE text from the Work, +provided that such additional attribution notices cannot be construed as +modifying the License. +You may add Your own copyright statement to Your modifications and may provide +additional or different license terms and conditions for use, reproduction, or +distribution of Your modifications, or for any such Derivative Works as a whole, +provided Your use, reproduction, and distribution of the Work otherwise complies +with the conditions stated in this License. + +5. Submission of Contributions. + +Unless You explicitly state otherwise, any Contribution intentionally submitted +for inclusion in the Work by You to the Licensor shall be under the terms and +conditions of this License, without any additional terms or conditions. +Notwithstanding the above, nothing herein shall supersede or modify the terms of +any separate license agreement you may have executed with Licensor regarding +such Contributions. + +6. Trademarks. + +This License does not grant permission to use the trade names, trademarks, +service marks, or product names of the Licensor, except as required for +reasonable and customary use in describing the origin of the Work and +reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. + +Unless required by applicable law or agreed to in writing, Licensor provides the +Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, +including, without limitation, any warranties or conditions of TITLE, +NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are +solely responsible for determining the appropriateness of using or +redistributing the Work and assume any risks associated with Your exercise of +permissions under this License. + +8. Limitation of Liability. + +In no event and under no legal theory, whether in tort (including negligence), +contract, or otherwise, unless required by applicable law (such as deliberate +and grossly negligent acts) or agreed to in writing, shall any Contributor be +liable to You for damages, including any direct, indirect, special, incidental, +or consequential damages of any character arising as a result of this License or +out of the use or inability to use the Work (including but not limited to +damages for loss of goodwill, work stoppage, computer failure or malfunction, or +any and all other commercial damages or losses), even if such Contributor has +been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. + +While redistributing the Work or Derivative Works thereof, You may choose to +offer, and charge a fee for, acceptance of support, warranty, indemnity, or +other liability obligations and/or rights consistent with this License. However, +in accepting such obligations, You may act only on Your own behalf and on Your +sole responsibility, not on behalf of any other Contributor, and only if You +agree to indemnify, defend, and hold each Contributor harmless for any liability +incurred by, or claims asserted against, such Contributor by reason of your +accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work + +To apply the Apache License to your work, attach the following boilerplate +notice, with the fields enclosed by brackets "[]" replaced with your own +identifying information. (Don't include the brackets!) The text should be +enclosed in the appropriate comment syntax for the file format. We also +recommend that a file or class name and description of purpose be included on +the same "printed page" as the copyright notice for easier identification within +third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/cmd/vendor/github.com/golang/groupcache/lru/lru.go b/cmd/vendor/github.com/golang/groupcache/lru/lru.go new file mode 100644 index 000000000..cdfe2991f --- /dev/null +++ b/cmd/vendor/github.com/golang/groupcache/lru/lru.go @@ -0,0 +1,121 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package lru implements an LRU cache. +package lru + +import "container/list" + +// Cache is an LRU cache. It is not safe for concurrent access. +type Cache struct { + // MaxEntries is the maximum number of cache entries before + // an item is evicted. Zero means no limit. + MaxEntries int + + // OnEvicted optionally specificies a callback function to be + // executed when an entry is purged from the cache. + OnEvicted func(key Key, value interface{}) + + ll *list.List + cache map[interface{}]*list.Element +} + +// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators +type Key interface{} + +type entry struct { + key Key + value interface{} +} + +// New creates a new Cache. +// If maxEntries is zero, the cache has no limit and it's assumed +// that eviction is done by the caller. +func New(maxEntries int) *Cache { + return &Cache{ + MaxEntries: maxEntries, + ll: list.New(), + cache: make(map[interface{}]*list.Element), + } +} + +// Add adds a value to the cache. +func (c *Cache) Add(key Key, value interface{}) { + if c.cache == nil { + c.cache = make(map[interface{}]*list.Element) + c.ll = list.New() + } + if ee, ok := c.cache[key]; ok { + c.ll.MoveToFront(ee) + ee.Value.(*entry).value = value + return + } + ele := c.ll.PushFront(&entry{key, value}) + c.cache[key] = ele + if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries { + c.RemoveOldest() + } +} + +// Get looks up a key's value from the cache. +func (c *Cache) Get(key Key) (value interface{}, ok bool) { + if c.cache == nil { + return + } + if ele, hit := c.cache[key]; hit { + c.ll.MoveToFront(ele) + return ele.Value.(*entry).value, true + } + return +} + +// Remove removes the provided key from the cache. +func (c *Cache) Remove(key Key) { + if c.cache == nil { + return + } + if ele, hit := c.cache[key]; hit { + c.removeElement(ele) + } +} + +// RemoveOldest removes the oldest item from the cache. +func (c *Cache) RemoveOldest() { + if c.cache == nil { + return + } + ele := c.ll.Back() + if ele != nil { + c.removeElement(ele) + } +} + +func (c *Cache) removeElement(e *list.Element) { + c.ll.Remove(e) + kv := e.Value.(*entry) + delete(c.cache, kv.key) + if c.OnEvicted != nil { + c.OnEvicted(kv.key, kv.value) + } +} + +// Len returns the number of items in the cache. +func (c *Cache) Len() int { + if c.cache == nil { + return 0 + } + return c.ll.Len() +} diff --git a/cmd/vendor/github.com/karlseguin/ccache/bucket.go b/cmd/vendor/github.com/karlseguin/ccache/bucket.go deleted file mode 100644 index d67535170..000000000 --- a/cmd/vendor/github.com/karlseguin/ccache/bucket.go +++ /dev/null @@ -1,41 +0,0 @@ -package ccache - -import ( - "sync" - "time" -) - -type bucket struct { - sync.RWMutex - lookup map[string]*Item -} - -func (b *bucket) get(key string) *Item { - b.RLock() - defer b.RUnlock() - return b.lookup[key] -} - -func (b *bucket) set(key string, value interface{}, duration time.Duration) (*Item, *Item) { - expires := time.Now().Add(duration).UnixNano() - item := newItem(key, value, expires) - b.Lock() - defer b.Unlock() - existing := b.lookup[key] - b.lookup[key] = item - return item, existing -} - -func (b *bucket) delete(key string) *Item { - b.Lock() - defer b.Unlock() - item := b.lookup[key] - delete(b.lookup, key) - return item -} - -func (b *bucket) clear() { - b.Lock() - defer b.Unlock() - b.lookup = make(map[string]*Item) -} diff --git a/cmd/vendor/github.com/karlseguin/ccache/cache.go b/cmd/vendor/github.com/karlseguin/ccache/cache.go deleted file mode 100644 index 998d1dbb3..000000000 --- a/cmd/vendor/github.com/karlseguin/ccache/cache.go +++ /dev/null @@ -1,218 +0,0 @@ -// An LRU cached aimed at high concurrency -package ccache - -import ( - "container/list" - "hash/fnv" - "sync/atomic" - "time" -) - -type Cache struct { - *Configuration - list *list.List - size int64 - buckets []*bucket - bucketMask uint32 - deletables chan *Item - promotables chan *Item -} - -// Create a new cache with the specified configuration -// See ccache.Configure() for creating a configuration -func New(config *Configuration) *Cache { - c := &Cache{ - list: list.New(), - Configuration: config, - bucketMask: uint32(config.buckets) - 1, - buckets: make([]*bucket, config.buckets), - deletables: make(chan *Item, config.deleteBuffer), - promotables: make(chan *Item, config.promoteBuffer), - } - for i := 0; i < int(config.buckets); i++ { - c.buckets[i] = &bucket{ - lookup: make(map[string]*Item), - } - } - go c.worker() - return c -} - -// Get an item from the cache. Returns nil if the item wasn't found. -// This can return an expired item. Use item.Expired() to see if the item -// is expired and item.TTL() to see how long until the item expires (which -// will be negative for an already expired item). -func (c *Cache) Get(key string) *Item { - item := c.bucket(key).get(key) - if item == nil { - return nil - } - if item.expires > time.Now().UnixNano() { - c.promote(item) - } - return item -} - -// Used when the cache was created with the Track() configuration option. -// Avoid otherwise -func (c *Cache) TrackingGet(key string) TrackedItem { - item := c.Get(key) - if item == nil { - return NilTracked - } - item.track() - return item -} - -// Set the value in the cache for the specified duration -func (c *Cache) Set(key string, value interface{}, duration time.Duration) { - c.set(key, value, duration) -} - -// Replace the value if it exists, does not set if it doesn't. -// Returns true if the item existed an was replaced, false otherwise. -// Replace does not reset item's TTL -func (c *Cache) Replace(key string, value interface{}) bool { - item := c.bucket(key).get(key) - if item == nil { - return false - } - c.Set(key, value, item.TTL()) - return true -} - -// Attempts to get the value from the cache and calles fetch on a miss (missing -// or stale item). If fetch returns an error, no value is cached and the error -// is returned back to the caller. -func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) { - item := c.Get(key) - if item != nil && !item.Expired() { - return item, nil - } - value, err := fetch() - if err != nil { - return nil, err - } - return c.set(key, value, duration), nil -} - -// Remove the item from the cache, return true if the item was present, false otherwise. -func (c *Cache) Delete(key string) bool { - item := c.bucket(key).delete(key) - if item != nil { - c.deletables <- item - return true - } - return false -} - -//this isn't thread safe. It's meant to be called from non-concurrent tests -func (c *Cache) Clear() { - for _, bucket := range c.buckets { - bucket.clear() - } - c.size = 0 - c.list = list.New() -} - -// Stops the background worker. Operations performed on the cache after Stop -// is called are likely to panic -func (c *Cache) Stop() { - close(c.promotables) -} - -func (c *Cache) deleteItem(bucket *bucket, item *Item) { - bucket.delete(item.key) //stop other GETs from getting it - c.deletables <- item -} - -func (c *Cache) set(key string, value interface{}, duration time.Duration) *Item { - item, existing := c.bucket(key).set(key, value, duration) - if existing != nil { - c.deletables <- existing - } - c.promote(item) - return item -} - -func (c *Cache) bucket(key string) *bucket { - h := fnv.New32a() - h.Write([]byte(key)) - return c.buckets[h.Sum32()&c.bucketMask] -} - -func (c *Cache) promote(item *Item) { - c.promotables <- item -} - -func (c *Cache) worker() { - for { - select { - case item, ok := <-c.promotables: - if ok == false { - goto drain - } - if c.doPromote(item) && c.size > c.maxSize { - c.gc() - } - case item := <-c.deletables: - c.doDelete(item) - } - } - -drain: - for { - select { - case item := <-c.deletables: - c.doDelete(item) - default: - close(c.deletables) - return - } - } -} - -func (c *Cache) doDelete(item *Item) { - if item.element == nil { - item.promotions = -2 - } else { - c.size -= item.size - c.list.Remove(item.element) - } -} - -func (c *Cache) doPromote(item *Item) bool { - //already deleted - if item.promotions == -2 { - return false - } - if item.element != nil { //not a new item - if item.shouldPromote(c.getsPerPromote) { - c.list.MoveToFront(item.element) - item.promotions = 0 - } - return false - } - - c.size += item.size - item.element = c.list.PushFront(item) - return true -} - -func (c *Cache) gc() { - element := c.list.Back() - for i := 0; i < c.itemsToPrune; i++ { - if element == nil { - return - } - prev := element.Prev() - item := element.Value.(*Item) - if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 { - c.bucket(item.key).delete(item.key) - c.size -= item.size - c.list.Remove(element) - item.promotions = -2 - } - element = prev - } -} diff --git a/cmd/vendor/github.com/karlseguin/ccache/configuration.go b/cmd/vendor/github.com/karlseguin/ccache/configuration.go deleted file mode 100644 index daa835776..000000000 --- a/cmd/vendor/github.com/karlseguin/ccache/configuration.go +++ /dev/null @@ -1,94 +0,0 @@ -package ccache - -type Configuration struct { - maxSize int64 - buckets int - itemsToPrune int - deleteBuffer int - promoteBuffer int - getsPerPromote int32 - tracking bool -} - -// Creates a configuration object with sensible defaults -// Use this as the start of the fluent configuration: -// e.g.: ccache.New(ccache.Configure().MaxSize(10000)) -func Configure() *Configuration { - return &Configuration{ - buckets: 16, - itemsToPrune: 500, - deleteBuffer: 1024, - getsPerPromote: 3, - promoteBuffer: 1024, - maxSize: 5000, - tracking: false, - } -} - -// The max size for the cache -// [5000] -func (c *Configuration) MaxSize(max int64) *Configuration { - c.maxSize = max - return c -} - -// Keys are hashed into % bucket count to provide greater concurrency (every set -// requires a write lock on the bucket). Must be a power of 2 (1, 2, 4, 8, 16, ...) -// [16] -func (c *Configuration) Buckets(count uint32) *Configuration { - if count == 0 || ((count&(^count+1)) == count) == false { - count = 16 - } - c.buckets = int(count) - return c -} - -// The number of items to prune when memory is low -// [500] -func (c *Configuration) ItemsToPrune(count uint32) *Configuration { - c.itemsToPrune = int(count) - return c -} - -// The size of the queue for items which should be promoted. If the queue fills -// up, promotions are skipped -// [1024] -func (c *Configuration) PromoteBuffer(size uint32) *Configuration { - c.promoteBuffer = int(size) - return c -} - -// The size of the queue for items which should be deleted. If the queue fills -// up, calls to Delete() will block -func (c *Configuration) DeleteBuffer(size uint32) *Configuration { - c.deleteBuffer = int(size) - return c -} - -// Give a large cache with a high read / write ratio, it's usually unecessary -// to promote an item on every Get. GetsPerPromote specifies the number of Gets -// a key must have before being promoted -// [3] -func (c *Configuration) GetsPerPromote(count int32) *Configuration { - c.getsPerPromote = count - return c -} - -// Typically, a cache is agnostic about how cached values are use. This is fine -// for a typical cache usage, where you fetch an item from the cache, do something -// (write it out) and nothing else. - -// However, if callers are going to keep a reference to a cached item for a long -// time, things get messy. Specifically, the cache can evict the item, while -// references still exist. Technically, this isn't an issue. However, if you reload -// the item back into the cache, you end up with 2 objects representing the same -// data. This is a waste of space and could lead to weird behavior (the type an -// identity map is meant to solve). - -// By turning tracking on and using the cache's TrackingGet, the cache -// won't evict items which you haven't called Release() on. It's a simple reference -// counter. -func (c *Configuration) Track() *Configuration { - c.tracking = true - return c -} diff --git a/cmd/vendor/github.com/karlseguin/ccache/item.go b/cmd/vendor/github.com/karlseguin/ccache/item.go deleted file mode 100644 index bb7c04fff..000000000 --- a/cmd/vendor/github.com/karlseguin/ccache/item.go +++ /dev/null @@ -1,103 +0,0 @@ -package ccache - -import ( - "container/list" - "sync/atomic" - "time" -) - -type Sized interface { - Size() int64 -} - -type TrackedItem interface { - Value() interface{} - Release() - Expired() bool - TTL() time.Duration - Expires() time.Time - Extend(duration time.Duration) -} - -type nilItem struct{} - -func (n *nilItem) Value() interface{} { return nil } -func (n *nilItem) Release() {} - -func (i *nilItem) Expired() bool { - return true -} - -func (i *nilItem) TTL() time.Duration { - return time.Minute -} - -func (i *nilItem) Expires() time.Time { - return time.Time{} -} - -func (i *nilItem) Extend(duration time.Duration) { -} - -var NilTracked = new(nilItem) - -type Item struct { - key string - group string - promotions int32 - refCount int32 - expires int64 - size int64 - value interface{} - element *list.Element -} - -func newItem(key string, value interface{}, expires int64) *Item { - size := int64(1) - if sized, ok := value.(Sized); ok { - size = sized.Size() - } - return &Item{ - key: key, - value: value, - promotions: 0, - size: size, - expires: expires, - } -} - -func (i *Item) shouldPromote(getsPerPromote int32) bool { - i.promotions += 1 - return i.promotions == getsPerPromote -} - -func (i *Item) Value() interface{} { - return i.value -} - -func (i *Item) track() { - atomic.AddInt32(&i.refCount, 1) -} - -func (i *Item) Release() { - atomic.AddInt32(&i.refCount, -1) -} - -func (i *Item) Expired() bool { - expires := atomic.LoadInt64(&i.expires) - return expires < time.Now().UnixNano() -} - -func (i *Item) TTL() time.Duration { - expires := atomic.LoadInt64(&i.expires) - return time.Nanosecond * time.Duration(expires-time.Now().UnixNano()) -} - -func (i *Item) Expires() time.Time { - expires := atomic.LoadInt64(&i.expires) - return time.Unix(0, expires) -} - -func (i *Item) Extend(duration time.Duration) { - atomic.StoreInt64(&i.expires, time.Now().Add(duration).UnixNano()) -} diff --git a/cmd/vendor/github.com/karlseguin/ccache/layeredbucket.go b/cmd/vendor/github.com/karlseguin/ccache/layeredbucket.go deleted file mode 100644 index 88f3def42..000000000 --- a/cmd/vendor/github.com/karlseguin/ccache/layeredbucket.go +++ /dev/null @@ -1,82 +0,0 @@ -package ccache - -import ( - "sync" - "time" -) - -type layeredBucket struct { - sync.RWMutex - buckets map[string]*bucket -} - -func (b *layeredBucket) get(primary, secondary string) *Item { - bucket := b.getSecondaryBucket(primary) - if bucket == nil { - return nil - } - return bucket.get(secondary) -} - -func (b *layeredBucket) getSecondaryBucket(primary string) *bucket { - b.RLock() - bucket, exists := b.buckets[primary] - b.RUnlock() - if exists == false { - return nil - } - return bucket -} - -func (b *layeredBucket) set(primary, secondary string, value interface{}, duration time.Duration) (*Item, *Item) { - b.Lock() - bkt, exists := b.buckets[primary] - if exists == false { - bkt = &bucket{lookup: make(map[string]*Item)} - b.buckets[primary] = bkt - } - b.Unlock() - item, existing := bkt.set(secondary, value, duration) - item.group = primary - return item, existing -} - -func (b *layeredBucket) delete(primary, secondary string) *Item { - b.RLock() - bucket, exists := b.buckets[primary] - b.RUnlock() - if exists == false { - return nil - } - return bucket.delete(secondary) -} - -func (b *layeredBucket) deleteAll(primary string, deletables chan *Item) bool { - b.RLock() - bucket, exists := b.buckets[primary] - b.RUnlock() - if exists == false { - return false - } - - bucket.Lock() - defer bucket.Unlock() - - if l := len(bucket.lookup); l == 0 { - return false - } - for key, item := range bucket.lookup { - delete(bucket.lookup, key) - deletables <- item - } - return true -} - -func (b *layeredBucket) clear() { - b.Lock() - defer b.Unlock() - for _, bucket := range b.buckets { - bucket.clear() - } - b.buckets = make(map[string]*bucket) -} diff --git a/cmd/vendor/github.com/karlseguin/ccache/layeredcache.go b/cmd/vendor/github.com/karlseguin/ccache/layeredcache.go deleted file mode 100644 index e064eed24..000000000 --- a/cmd/vendor/github.com/karlseguin/ccache/layeredcache.go +++ /dev/null @@ -1,222 +0,0 @@ -// An LRU cached aimed at high concurrency -package ccache - -import ( - "container/list" - "hash/fnv" - "sync/atomic" - "time" -) - -type LayeredCache struct { - *Configuration - list *list.List - buckets []*layeredBucket - bucketMask uint32 - size int64 - deletables chan *Item - promotables chan *Item -} - -// Create a new layered cache with the specified configuration. -// A layered cache used a two keys to identify a value: a primary key -// and a secondary key. Get, Set and Delete require both a primary and -// secondary key. However, DeleteAll requires only a primary key, deleting -// all values that share the same primary key. - -// Layered Cache is useful as an HTTP cache, where an HTTP purge might -// delete multiple variants of the same resource: -// primary key = "user/44" -// secondary key 1 = ".json" -// secondary key 2 = ".xml" - -// See ccache.Configure() for creating a configuration -func Layered(config *Configuration) *LayeredCache { - c := &LayeredCache{ - list: list.New(), - Configuration: config, - bucketMask: uint32(config.buckets) - 1, - buckets: make([]*layeredBucket, config.buckets), - deletables: make(chan *Item, config.deleteBuffer), - promotables: make(chan *Item, config.promoteBuffer), - } - for i := 0; i < int(config.buckets); i++ { - c.buckets[i] = &layeredBucket{ - buckets: make(map[string]*bucket), - } - } - go c.worker() - return c -} - -// Get an item from the cache. Returns nil if the item wasn't found. -// This can return an expired item. Use item.Expired() to see if the item -// is expired and item.TTL() to see how long until the item expires (which -// will be negative for an already expired item). -func (c *LayeredCache) Get(primary, secondary string) *Item { - item := c.bucket(primary).get(primary, secondary) - if item == nil { - return nil - } - if item.expires > time.Now().UnixNano() { - c.promote(item) - } - return item -} - -// Get the secondary cache for a given primary key. This operation will -// never return nil. In the case where the primary key does not exist, a -// new, underlying, empty bucket will be created and returned. -func (c *LayeredCache) GetOrCreateSecondaryCache(primary string) *SecondaryCache { - primaryBkt := c.bucket(primary) - bkt := primaryBkt.getSecondaryBucket(primary) - primaryBkt.Lock() - if bkt == nil { - bkt = &bucket{lookup: make(map[string]*Item)} - primaryBkt.buckets[primary] = bkt - } - primaryBkt.Unlock() - return &SecondaryCache{ - bucket: bkt, - pCache: c, - } -} - -// Used when the cache was created with the Track() configuration option. -// Avoid otherwise -func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem { - item := c.Get(primary, secondary) - if item == nil { - return NilTracked - } - item.track() - return item -} - -// Set the value in the cache for the specified duration -func (c *LayeredCache) Set(primary, secondary string, value interface{}, duration time.Duration) { - c.set(primary, secondary, value, duration) -} - -// Replace the value if it exists, does not set if it doesn't. -// Returns true if the item existed an was replaced, false otherwise. -// Replace does not reset item's TTL nor does it alter its position in the LRU -func (c *LayeredCache) Replace(primary, secondary string, value interface{}) bool { - item := c.bucket(primary).get(primary, secondary) - if item == nil { - return false - } - c.Set(primary, secondary, value, item.TTL()) - return true -} - -// Attempts to get the value from the cache and calles fetch on a miss. -// If fetch returns an error, no value is cached and the error is returned back -// to the caller. -func (c *LayeredCache) Fetch(primary, secondary string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) { - item := c.Get(primary, secondary) - if item != nil { - return item, nil - } - value, err := fetch() - if err != nil { - return nil, err - } - return c.set(primary, secondary, value, duration), nil -} - -// Remove the item from the cache, return true if the item was present, false otherwise. -func (c *LayeredCache) Delete(primary, secondary string) bool { - item := c.bucket(primary).delete(primary, secondary) - if item != nil { - c.deletables <- item - return true - } - return false -} - -// Deletes all items that share the same primary key -func (c *LayeredCache) DeleteAll(primary string) bool { - return c.bucket(primary).deleteAll(primary, c.deletables) -} - -//this isn't thread safe. It's meant to be called from non-concurrent tests -func (c *LayeredCache) Clear() { - for _, bucket := range c.buckets { - bucket.clear() - } - c.size = 0 - c.list = list.New() -} - -func (c *LayeredCache) set(primary, secondary string, value interface{}, duration time.Duration) *Item { - item, existing := c.bucket(primary).set(primary, secondary, value, duration) - if existing != nil { - c.deletables <- existing - } - c.promote(item) - return item -} - -func (c *LayeredCache) bucket(key string) *layeredBucket { - h := fnv.New32a() - h.Write([]byte(key)) - return c.buckets[h.Sum32()&c.bucketMask] -} - -func (c *LayeredCache) promote(item *Item) { - c.promotables <- item -} - -func (c *LayeredCache) worker() { - for { - select { - case item := <-c.promotables: - if c.doPromote(item) && c.size > c.maxSize { - c.gc() - } - case item := <-c.deletables: - if item.element == nil { - item.promotions = -2 - } else { - c.size -= item.size - c.list.Remove(item.element) - } - } - } -} - -func (c *LayeredCache) doPromote(item *Item) bool { - // deleted before it ever got promoted - if item.promotions == -2 { - return false - } - if item.element != nil { //not a new item - if item.shouldPromote(c.getsPerPromote) { - c.list.MoveToFront(item.element) - item.promotions = 0 - } - return false - } - c.size += item.size - item.element = c.list.PushFront(item) - return true -} - -func (c *LayeredCache) gc() { - element := c.list.Back() - for i := 0; i < c.itemsToPrune; i++ { - if element == nil { - return - } - prev := element.Prev() - item := element.Value.(*Item) - if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 { - c.bucket(item.group).delete(item.group, item.key) - c.size -= item.size - c.list.Remove(element) - item.promotions = -2 - } - element = prev - } -} diff --git a/cmd/vendor/github.com/karlseguin/ccache/license.txt b/cmd/vendor/github.com/karlseguin/ccache/license.txt deleted file mode 100644 index aebeebfa5..000000000 --- a/cmd/vendor/github.com/karlseguin/ccache/license.txt +++ /dev/null @@ -1,19 +0,0 @@ -Copyright (c) 2013 Karl Seguin. - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. diff --git a/cmd/vendor/github.com/karlseguin/ccache/secondarycache.go b/cmd/vendor/github.com/karlseguin/ccache/secondarycache.go deleted file mode 100644 index f901fde0c..000000000 --- a/cmd/vendor/github.com/karlseguin/ccache/secondarycache.go +++ /dev/null @@ -1,72 +0,0 @@ -package ccache - -import "time" - -type SecondaryCache struct { - bucket *bucket - pCache *LayeredCache -} - -// Get the secondary key. -// The semantics are the same as for LayeredCache.Get -func (s *SecondaryCache) Get(secondary string) *Item { - return s.bucket.get(secondary) -} - -// Set the secondary key to a value. -// The semantics are the same as for LayeredCache.Set -func (s *SecondaryCache) Set(secondary string, value interface{}, duration time.Duration) *Item { - item, existing := s.bucket.set(secondary, value, duration) - if existing != nil { - s.pCache.deletables <- existing - } - s.pCache.promote(item) - return item -} - -// Fetch or set a secondary key. -// The semantics are the same as for LayeredCache.Fetch -func (s *SecondaryCache) Fetch(secondary string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) { - item := s.Get(secondary) - if item != nil { - return item, nil - } - value, err := fetch() - if err != nil { - return nil, err - } - return s.Set(secondary, value, duration), nil -} - -// Delete a secondary key. -// The semantics are the same as for LayeredCache.Delete -func (s *SecondaryCache) Delete(secondary string) bool { - item := s.bucket.delete(secondary) - if item != nil { - s.pCache.deletables <- item - return true - } - return false -} - -// Replace a secondary key. -// The semantics are the same as for LayeredCache.Replace -func (s *SecondaryCache) Replace(secondary string, value interface{}) bool { - item := s.Get(secondary) - if item == nil { - return false - } - s.Set(secondary, value, item.TTL()) - return true -} - -// Track a secondary key. -// The semantics are the same as for LayeredCache.TrackingGet -func (c *SecondaryCache) TrackingGet(secondary string) TrackedItem { - item := c.Get(secondary) - if item == nil { - return NilTracked - } - item.track() - return item -} diff --git a/glide.lock b/glide.lock index c9cf1b42c..7a8a14603 100644 --- a/glide.lock +++ b/glide.lock @@ -38,6 +38,10 @@ imports: version: 909568be09de550ed094403c2bf8a261b5bb730a subpackages: - proto +- name: github.com/golang/groupcache + version: 02826c3e79038b59d737d3b1c0a1d937f71a4433 + subpackages: + - lru - name: github.com/golang/protobuf version: 4bd1920723d7b7c925de087aa32e2187708897f7 subpackages: @@ -57,8 +61,6 @@ imports: version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75 - name: github.com/jonboulle/clockwork version: 2eee05ed794112d45db504eb05aa693efd2b8b09 -- name: github.com/karlseguin/ccache - version: a2d62155777b39595c825ed3824279e642a5db3c - name: github.com/kr/pty version: f7ee69f31298ecbe5d2b349c711e2547a617d398 - name: github.com/mattn/go-runewidth diff --git a/glide.yaml b/glide.yaml index 071c461f4..4b2c5bf1b 100644 --- a/glide.yaml +++ b/glide.yaml @@ -28,6 +28,10 @@ import: version: v0.3 subpackages: - proto +- package: github.com/golang/groupcache + version: 02826c3e79038b59d737d3b1c0a1d937f71a4433 + subpackages: + - lru - package: github.com/golang/protobuf version: 4bd1920723d7b7c925de087aa32e2187708897f7 subpackages: @@ -101,5 +105,3 @@ import: version: 976c720a22c8eb4eb6a0b4348ad85ad12491a506 subpackages: - assert -- package: github.com/karlseguin/ccache - version: v2.0.2 diff --git a/proxy/grpcproxy/cache/store.go b/proxy/grpcproxy/cache/store.go index 155bbf900..895fb1f97 100644 --- a/proxy/grpcproxy/cache/store.go +++ b/proxy/grpcproxy/cache/store.go @@ -17,13 +17,11 @@ package cache import ( "errors" "sync" - "time" - - "github.com/karlseguin/ccache" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/adt" + "github.com/golang/groupcache/lru" ) var ( @@ -31,14 +29,12 @@ var ( ErrCompacted = rpctypes.ErrGRPCCompacted ) -const defaultHistoricTTL = time.Hour -const defaultCurrentTTL = time.Minute - type Cache interface { Add(req *pb.RangeRequest, resp *pb.RangeResponse) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) Compact(revision int64) Invalidate(key []byte, endkey []byte) + Size() int Close() } @@ -54,17 +50,17 @@ func keyFunc(req *pb.RangeRequest) string { func NewCache(maxCacheEntries int) Cache { return &cache{ - lru: ccache.New(ccache.Configure().MaxSize(int64(maxCacheEntries))), + lru: lru.New(maxCacheEntries), compactedRev: -1, } } -func (c *cache) Close() { c.lru.Stop() } +func (c *cache) Close() {} // cache implements Cache type cache struct { mu sync.RWMutex - lru *ccache.Cache + lru *lru.Cache // a reverse index for cache invalidation cachedRanges adt.IntervalTree @@ -80,11 +76,7 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) { defer c.mu.Unlock() if req.Revision > c.compactedRev { - if req.Revision == 0 { - c.lru.Set(key, resp, defaultCurrentTTL) - } else { - c.lru.Set(key, resp, defaultHistoricTTL) - } + c.lru.Add(key, resp) } // we do not need to invalidate a request with a revision specified. // so we do not need to add it into the reverse index. @@ -116,16 +108,16 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) { func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) { key := keyFunc(req) - c.mu.RLock() - defer c.mu.RUnlock() + c.mu.Lock() + defer c.mu.Unlock() if req.Revision < c.compactedRev { - c.lru.Delete(key) + c.lru.Remove(key) return nil, ErrCompacted } - if item := c.lru.Get(key); item != nil { - return item.Value().(*pb.RangeResponse), nil + if resp, ok := c.lru.Get(key); ok { + return resp.(*pb.RangeResponse), nil } return nil, errors.New("not exist") } @@ -149,7 +141,7 @@ func (c *cache) Invalidate(key, endkey []byte) { for _, iv := range ivs { keys := iv.Val.([]string) for _, key := range keys { - c.lru.Delete(key) + c.lru.Remove(key) } } // delete after removing all keys since it is destructive to 'ivs' @@ -166,3 +158,9 @@ func (c *cache) Compact(revision int64) { c.compactedRev = revision } } + +func (c *cache) Size() int { + c.mu.RLock() + defer c.mu.RUnlock() + return c.lru.Len() +} diff --git a/proxy/grpcproxy/kv.go b/proxy/grpcproxy/kv.go index 3729d65a5..309ee75da 100644 --- a/proxy/grpcproxy/kv.go +++ b/proxy/grpcproxy/kv.go @@ -65,12 +65,14 @@ func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRespo req.Serializable = true gresp := (*pb.RangeResponse)(resp.Get()) p.cache.Add(&req, gresp) + cacheKeys.Set(float64(p.cache.Size())) return gresp, nil } func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { p.cache.Invalidate(r.Key, nil) + cacheKeys.Set(float64(p.cache.Size())) resp, err := p.kv.Do(ctx, PutRequestToOp(r)) return (*pb.PutResponse)(resp.Put()), err @@ -78,6 +80,7 @@ func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, e func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { p.cache.Invalidate(r.Key, r.RangeEnd) + cacheKeys.Set(float64(p.cache.Size())) resp, err := p.kv.Do(ctx, DelRequestToOp(r)) return (*pb.DeleteRangeResponse)(resp.Del()), err @@ -133,6 +136,8 @@ func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, e p.txnToCache(r.Failure, resp.Responses) } + cacheKeys.Set(float64(p.cache.Size())) + return (*pb.TxnResponse)(resp), nil } @@ -147,6 +152,8 @@ func (p *kvProxy) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.Com p.cache.Compact(r.Revision) } + cacheKeys.Set(float64(p.cache.Size())) + return (*pb.CompactionResponse)(resp), err } diff --git a/proxy/grpcproxy/metrics.go b/proxy/grpcproxy/metrics.go index f4a1d4c8d..864fa1609 100644 --- a/proxy/grpcproxy/metrics.go +++ b/proxy/grpcproxy/metrics.go @@ -29,6 +29,12 @@ var ( Name: "events_coalescing_total", Help: "Total number of events coalescing", }) + cacheKeys = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "grpc_proxy", + Name: "cache_keys_total", + Help: "Total number of keys/ranges cached", + }) cacheHits = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "etcd", Subsystem: "grpc_proxy", @@ -46,6 +52,7 @@ var ( func init() { prometheus.MustRegister(watchersCoalescing) prometheus.MustRegister(eventsCoalescing) + prometheus.MustRegister(cacheKeys) prometheus.MustRegister(cacheHits) prometheus.MustRegister(cachedMisses) }