Merge pull request #7304 from heyitsanthony/remove-ccache

Remove ccache
This commit is contained in:
Anthony Romano 2017-02-10 16:02:31 -08:00 committed by GitHub
commit 9f8e82e1c0
15 changed files with 352 additions and 875 deletions

191
cmd/vendor/github.com/golang/groupcache/LICENSE generated vendored Normal file
View File

@ -0,0 +1,191 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction, and
distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by the copyright
owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all other entities
that control, are controlled by, or are under common control with that entity.
For the purposes of this definition, "control" means (i) the power, direct or
indirect, to cause the direction or management of such entity, whether by
contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity exercising
permissions granted by this License.
"Source" form shall mean the preferred form for making modifications, including
but not limited to software source code, documentation source, and configuration
files.
"Object" form shall mean any form resulting from mechanical transformation or
translation of a Source form, including but not limited to compiled object code,
generated documentation, and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or Object form, made
available under the License, as indicated by a copyright notice that is included
in or attached to the work (an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object form, that
is based on (or derived from) the Work and for which the editorial revisions,
annotations, elaborations, or other modifications represent, as a whole, an
original work of authorship. For the purposes of this License, Derivative Works
shall not include works that remain separable from, or merely link (or bind by
name) to the interfaces of, the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including the original version
of the Work and any modifications or additions to that Work or Derivative Works
thereof, that is intentionally submitted to Licensor for inclusion in the Work
by the copyright owner or by an individual or Legal Entity authorized to submit
on behalf of the copyright owner. For the purposes of this definition,
"submitted" means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems, and
issue tracking systems that are managed by, or on behalf of, the Licensor for
the purpose of discussing and improving the Work, but excluding communication
that is conspicuously marked or otherwise designated in writing by the copyright
owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity on behalf
of whom a Contribution has been received by Licensor and subsequently
incorporated within the Work.
2. Grant of Copyright License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the Work and such
Derivative Works in Source or Object form.
3. Grant of Patent License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to make, have
made, use, offer to sell, sell, import, and otherwise transfer the Work, where
such license applies only to those patent claims licensable by such Contributor
that are necessarily infringed by their Contribution(s) alone or by combination
of their Contribution(s) with the Work to which such Contribution(s) was
submitted. If You institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work or a
Contribution incorporated within the Work constitutes direct or contributory
patent infringement, then any patent licenses granted to You under this License
for that Work shall terminate as of the date such litigation is filed.
4. Redistribution.
You may reproduce and distribute copies of the Work or Derivative Works thereof
in any medium, with or without modifications, and in Source or Object form,
provided that You meet the following conditions:
You must give any other recipients of the Work or Derivative Works a copy of
this License; and
You must cause any modified files to carry prominent notices stating that You
changed the files; and
You must retain, in the Source form of any Derivative Works that You distribute,
all copyright, patent, trademark, and attribution notices from the Source form
of the Work, excluding those notices that do not pertain to any part of the
Derivative Works; and
If the Work includes a "NOTICE" text file as part of its distribution, then any
Derivative Works that You distribute must include a readable copy of the
attribution notices contained within such NOTICE file, excluding those notices
that do not pertain to any part of the Derivative Works, in at least one of the
following places: within a NOTICE text file distributed as part of the
Derivative Works; within the Source form or documentation, if provided along
with the Derivative Works; or, within a display generated by the Derivative
Works, if and wherever such third-party notices normally appear. The contents of
the NOTICE file are for informational purposes only and do not modify the
License. You may add Your own attribution notices within Derivative Works that
You distribute, alongside or as an addendum to the NOTICE text from the Work,
provided that such additional attribution notices cannot be construed as
modifying the License.
You may add Your own copyright statement to Your modifications and may provide
additional or different license terms and conditions for use, reproduction, or
distribution of Your modifications, or for any such Derivative Works as a whole,
provided Your use, reproduction, and distribution of the Work otherwise complies
with the conditions stated in this License.
5. Submission of Contributions.
Unless You explicitly state otherwise, any Contribution intentionally submitted
for inclusion in the Work by You to the Licensor shall be under the terms and
conditions of this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify the terms of
any separate license agreement you may have executed with Licensor regarding
such Contributions.
6. Trademarks.
This License does not grant permission to use the trade names, trademarks,
service marks, or product names of the Licensor, except as required for
reasonable and customary use in describing the origin of the Work and
reproducing the content of the NOTICE file.
7. Disclaimer of Warranty.
Unless required by applicable law or agreed to in writing, Licensor provides the
Work (and each Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied,
including, without limitation, any warranties or conditions of TITLE,
NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are
solely responsible for determining the appropriateness of using or
redistributing the Work and assume any risks associated with Your exercise of
permissions under this License.
8. Limitation of Liability.
In no event and under no legal theory, whether in tort (including negligence),
contract, or otherwise, unless required by applicable law (such as deliberate
and grossly negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special, incidental,
or consequential damages of any character arising as a result of this License or
out of the use or inability to use the Work (including but not limited to
damages for loss of goodwill, work stoppage, computer failure or malfunction, or
any and all other commercial damages or losses), even if such Contributor has
been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability.
While redistributing the Work or Derivative Works thereof, You may choose to
offer, and charge a fee for, acceptance of support, warranty, indemnity, or
other liability obligations and/or rights consistent with this License. However,
in accepting such obligations, You may act only on Your own behalf and on Your
sole responsibility, not on behalf of any other Contributor, and only if You
agree to indemnify, defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason of your
accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work
To apply the Apache License to your work, attach the following boilerplate
notice, with the fields enclosed by brackets "[]" replaced with your own
identifying information. (Don't include the brackets!) The text should be
enclosed in the appropriate comment syntax for the file format. We also
recommend that a file or class name and description of purpose be included on
the same "printed page" as the copyright notice for easier identification within
third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

121
cmd/vendor/github.com/golang/groupcache/lru/lru.go generated vendored Normal file
View File

@ -0,0 +1,121 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package lru implements an LRU cache.
package lru
import "container/list"
// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
// MaxEntries is the maximum number of cache entries before
// an item is evicted. Zero means no limit.
MaxEntries int
// OnEvicted optionally specificies a callback function to be
// executed when an entry is purged from the cache.
OnEvicted func(key Key, value interface{})
ll *list.List
cache map[interface{}]*list.Element
}
// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators
type Key interface{}
type entry struct {
key Key
value interface{}
}
// New creates a new Cache.
// If maxEntries is zero, the cache has no limit and it's assumed
// that eviction is done by the caller.
func New(maxEntries int) *Cache {
return &Cache{
MaxEntries: maxEntries,
ll: list.New(),
cache: make(map[interface{}]*list.Element),
}
}
// Add adds a value to the cache.
func (c *Cache) Add(key Key, value interface{}) {
if c.cache == nil {
c.cache = make(map[interface{}]*list.Element)
c.ll = list.New()
}
if ee, ok := c.cache[key]; ok {
c.ll.MoveToFront(ee)
ee.Value.(*entry).value = value
return
}
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.RemoveOldest()
}
}
// Get looks up a key's value from the cache.
func (c *Cache) Get(key Key) (value interface{}, ok bool) {
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.ll.MoveToFront(ele)
return ele.Value.(*entry).value, true
}
return
}
// Remove removes the provided key from the cache.
func (c *Cache) Remove(key Key) {
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.removeElement(ele)
}
}
// RemoveOldest removes the oldest item from the cache.
func (c *Cache) RemoveOldest() {
if c.cache == nil {
return
}
ele := c.ll.Back()
if ele != nil {
c.removeElement(ele)
}
}
func (c *Cache) removeElement(e *list.Element) {
c.ll.Remove(e)
kv := e.Value.(*entry)
delete(c.cache, kv.key)
if c.OnEvicted != nil {
c.OnEvicted(kv.key, kv.value)
}
}
// Len returns the number of items in the cache.
func (c *Cache) Len() int {
if c.cache == nil {
return 0
}
return c.ll.Len()
}

View File

@ -1,41 +0,0 @@
package ccache
import (
"sync"
"time"
)
type bucket struct {
sync.RWMutex
lookup map[string]*Item
}
func (b *bucket) get(key string) *Item {
b.RLock()
defer b.RUnlock()
return b.lookup[key]
}
func (b *bucket) set(key string, value interface{}, duration time.Duration) (*Item, *Item) {
expires := time.Now().Add(duration).UnixNano()
item := newItem(key, value, expires)
b.Lock()
defer b.Unlock()
existing := b.lookup[key]
b.lookup[key] = item
return item, existing
}
func (b *bucket) delete(key string) *Item {
b.Lock()
defer b.Unlock()
item := b.lookup[key]
delete(b.lookup, key)
return item
}
func (b *bucket) clear() {
b.Lock()
defer b.Unlock()
b.lookup = make(map[string]*Item)
}

View File

@ -1,218 +0,0 @@
// An LRU cached aimed at high concurrency
package ccache
import (
"container/list"
"hash/fnv"
"sync/atomic"
"time"
)
type Cache struct {
*Configuration
list *list.List
size int64
buckets []*bucket
bucketMask uint32
deletables chan *Item
promotables chan *Item
}
// Create a new cache with the specified configuration
// See ccache.Configure() for creating a configuration
func New(config *Configuration) *Cache {
c := &Cache{
list: list.New(),
Configuration: config,
bucketMask: uint32(config.buckets) - 1,
buckets: make([]*bucket, config.buckets),
deletables: make(chan *Item, config.deleteBuffer),
promotables: make(chan *Item, config.promoteBuffer),
}
for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &bucket{
lookup: make(map[string]*Item),
}
}
go c.worker()
return c
}
// Get an item from the cache. Returns nil if the item wasn't found.
// This can return an expired item. Use item.Expired() to see if the item
// is expired and item.TTL() to see how long until the item expires (which
// will be negative for an already expired item).
func (c *Cache) Get(key string) *Item {
item := c.bucket(key).get(key)
if item == nil {
return nil
}
if item.expires > time.Now().UnixNano() {
c.promote(item)
}
return item
}
// Used when the cache was created with the Track() configuration option.
// Avoid otherwise
func (c *Cache) TrackingGet(key string) TrackedItem {
item := c.Get(key)
if item == nil {
return NilTracked
}
item.track()
return item
}
// Set the value in the cache for the specified duration
func (c *Cache) Set(key string, value interface{}, duration time.Duration) {
c.set(key, value, duration)
}
// Replace the value if it exists, does not set if it doesn't.
// Returns true if the item existed an was replaced, false otherwise.
// Replace does not reset item's TTL
func (c *Cache) Replace(key string, value interface{}) bool {
item := c.bucket(key).get(key)
if item == nil {
return false
}
c.Set(key, value, item.TTL())
return true
}
// Attempts to get the value from the cache and calles fetch on a miss (missing
// or stale item). If fetch returns an error, no value is cached and the error
// is returned back to the caller.
func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
item := c.Get(key)
if item != nil && !item.Expired() {
return item, nil
}
value, err := fetch()
if err != nil {
return nil, err
}
return c.set(key, value, duration), nil
}
// Remove the item from the cache, return true if the item was present, false otherwise.
func (c *Cache) Delete(key string) bool {
item := c.bucket(key).delete(key)
if item != nil {
c.deletables <- item
return true
}
return false
}
//this isn't thread safe. It's meant to be called from non-concurrent tests
func (c *Cache) Clear() {
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = list.New()
}
// Stops the background worker. Operations performed on the cache after Stop
// is called are likely to panic
func (c *Cache) Stop() {
close(c.promotables)
}
func (c *Cache) deleteItem(bucket *bucket, item *Item) {
bucket.delete(item.key) //stop other GETs from getting it
c.deletables <- item
}
func (c *Cache) set(key string, value interface{}, duration time.Duration) *Item {
item, existing := c.bucket(key).set(key, value, duration)
if existing != nil {
c.deletables <- existing
}
c.promote(item)
return item
}
func (c *Cache) bucket(key string) *bucket {
h := fnv.New32a()
h.Write([]byte(key))
return c.buckets[h.Sum32()&c.bucketMask]
}
func (c *Cache) promote(item *Item) {
c.promotables <- item
}
func (c *Cache) worker() {
for {
select {
case item, ok := <-c.promotables:
if ok == false {
goto drain
}
if c.doPromote(item) && c.size > c.maxSize {
c.gc()
}
case item := <-c.deletables:
c.doDelete(item)
}
}
drain:
for {
select {
case item := <-c.deletables:
c.doDelete(item)
default:
close(c.deletables)
return
}
}
}
func (c *Cache) doDelete(item *Item) {
if item.element == nil {
item.promotions = -2
} else {
c.size -= item.size
c.list.Remove(item.element)
}
}
func (c *Cache) doPromote(item *Item) bool {
//already deleted
if item.promotions == -2 {
return false
}
if item.element != nil { //not a new item
if item.shouldPromote(c.getsPerPromote) {
c.list.MoveToFront(item.element)
item.promotions = 0
}
return false
}
c.size += item.size
item.element = c.list.PushFront(item)
return true
}
func (c *Cache) gc() {
element := c.list.Back()
for i := 0; i < c.itemsToPrune; i++ {
if element == nil {
return
}
prev := element.Prev()
item := element.Value.(*Item)
if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
c.bucket(item.key).delete(item.key)
c.size -= item.size
c.list.Remove(element)
item.promotions = -2
}
element = prev
}
}

View File

@ -1,94 +0,0 @@
package ccache
type Configuration struct {
maxSize int64
buckets int
itemsToPrune int
deleteBuffer int
promoteBuffer int
getsPerPromote int32
tracking bool
}
// Creates a configuration object with sensible defaults
// Use this as the start of the fluent configuration:
// e.g.: ccache.New(ccache.Configure().MaxSize(10000))
func Configure() *Configuration {
return &Configuration{
buckets: 16,
itemsToPrune: 500,
deleteBuffer: 1024,
getsPerPromote: 3,
promoteBuffer: 1024,
maxSize: 5000,
tracking: false,
}
}
// The max size for the cache
// [5000]
func (c *Configuration) MaxSize(max int64) *Configuration {
c.maxSize = max
return c
}
// Keys are hashed into % bucket count to provide greater concurrency (every set
// requires a write lock on the bucket). Must be a power of 2 (1, 2, 4, 8, 16, ...)
// [16]
func (c *Configuration) Buckets(count uint32) *Configuration {
if count == 0 || ((count&(^count+1)) == count) == false {
count = 16
}
c.buckets = int(count)
return c
}
// The number of items to prune when memory is low
// [500]
func (c *Configuration) ItemsToPrune(count uint32) *Configuration {
c.itemsToPrune = int(count)
return c
}
// The size of the queue for items which should be promoted. If the queue fills
// up, promotions are skipped
// [1024]
func (c *Configuration) PromoteBuffer(size uint32) *Configuration {
c.promoteBuffer = int(size)
return c
}
// The size of the queue for items which should be deleted. If the queue fills
// up, calls to Delete() will block
func (c *Configuration) DeleteBuffer(size uint32) *Configuration {
c.deleteBuffer = int(size)
return c
}
// Give a large cache with a high read / write ratio, it's usually unecessary
// to promote an item on every Get. GetsPerPromote specifies the number of Gets
// a key must have before being promoted
// [3]
func (c *Configuration) GetsPerPromote(count int32) *Configuration {
c.getsPerPromote = count
return c
}
// Typically, a cache is agnostic about how cached values are use. This is fine
// for a typical cache usage, where you fetch an item from the cache, do something
// (write it out) and nothing else.
// However, if callers are going to keep a reference to a cached item for a long
// time, things get messy. Specifically, the cache can evict the item, while
// references still exist. Technically, this isn't an issue. However, if you reload
// the item back into the cache, you end up with 2 objects representing the same
// data. This is a waste of space and could lead to weird behavior (the type an
// identity map is meant to solve).
// By turning tracking on and using the cache's TrackingGet, the cache
// won't evict items which you haven't called Release() on. It's a simple reference
// counter.
func (c *Configuration) Track() *Configuration {
c.tracking = true
return c
}

View File

@ -1,103 +0,0 @@
package ccache
import (
"container/list"
"sync/atomic"
"time"
)
type Sized interface {
Size() int64
}
type TrackedItem interface {
Value() interface{}
Release()
Expired() bool
TTL() time.Duration
Expires() time.Time
Extend(duration time.Duration)
}
type nilItem struct{}
func (n *nilItem) Value() interface{} { return nil }
func (n *nilItem) Release() {}
func (i *nilItem) Expired() bool {
return true
}
func (i *nilItem) TTL() time.Duration {
return time.Minute
}
func (i *nilItem) Expires() time.Time {
return time.Time{}
}
func (i *nilItem) Extend(duration time.Duration) {
}
var NilTracked = new(nilItem)
type Item struct {
key string
group string
promotions int32
refCount int32
expires int64
size int64
value interface{}
element *list.Element
}
func newItem(key string, value interface{}, expires int64) *Item {
size := int64(1)
if sized, ok := value.(Sized); ok {
size = sized.Size()
}
return &Item{
key: key,
value: value,
promotions: 0,
size: size,
expires: expires,
}
}
func (i *Item) shouldPromote(getsPerPromote int32) bool {
i.promotions += 1
return i.promotions == getsPerPromote
}
func (i *Item) Value() interface{} {
return i.value
}
func (i *Item) track() {
atomic.AddInt32(&i.refCount, 1)
}
func (i *Item) Release() {
atomic.AddInt32(&i.refCount, -1)
}
func (i *Item) Expired() bool {
expires := atomic.LoadInt64(&i.expires)
return expires < time.Now().UnixNano()
}
func (i *Item) TTL() time.Duration {
expires := atomic.LoadInt64(&i.expires)
return time.Nanosecond * time.Duration(expires-time.Now().UnixNano())
}
func (i *Item) Expires() time.Time {
expires := atomic.LoadInt64(&i.expires)
return time.Unix(0, expires)
}
func (i *Item) Extend(duration time.Duration) {
atomic.StoreInt64(&i.expires, time.Now().Add(duration).UnixNano())
}

View File

@ -1,82 +0,0 @@
package ccache
import (
"sync"
"time"
)
type layeredBucket struct {
sync.RWMutex
buckets map[string]*bucket
}
func (b *layeredBucket) get(primary, secondary string) *Item {
bucket := b.getSecondaryBucket(primary)
if bucket == nil {
return nil
}
return bucket.get(secondary)
}
func (b *layeredBucket) getSecondaryBucket(primary string) *bucket {
b.RLock()
bucket, exists := b.buckets[primary]
b.RUnlock()
if exists == false {
return nil
}
return bucket
}
func (b *layeredBucket) set(primary, secondary string, value interface{}, duration time.Duration) (*Item, *Item) {
b.Lock()
bkt, exists := b.buckets[primary]
if exists == false {
bkt = &bucket{lookup: make(map[string]*Item)}
b.buckets[primary] = bkt
}
b.Unlock()
item, existing := bkt.set(secondary, value, duration)
item.group = primary
return item, existing
}
func (b *layeredBucket) delete(primary, secondary string) *Item {
b.RLock()
bucket, exists := b.buckets[primary]
b.RUnlock()
if exists == false {
return nil
}
return bucket.delete(secondary)
}
func (b *layeredBucket) deleteAll(primary string, deletables chan *Item) bool {
b.RLock()
bucket, exists := b.buckets[primary]
b.RUnlock()
if exists == false {
return false
}
bucket.Lock()
defer bucket.Unlock()
if l := len(bucket.lookup); l == 0 {
return false
}
for key, item := range bucket.lookup {
delete(bucket.lookup, key)
deletables <- item
}
return true
}
func (b *layeredBucket) clear() {
b.Lock()
defer b.Unlock()
for _, bucket := range b.buckets {
bucket.clear()
}
b.buckets = make(map[string]*bucket)
}

View File

@ -1,222 +0,0 @@
// An LRU cached aimed at high concurrency
package ccache
import (
"container/list"
"hash/fnv"
"sync/atomic"
"time"
)
type LayeredCache struct {
*Configuration
list *list.List
buckets []*layeredBucket
bucketMask uint32
size int64
deletables chan *Item
promotables chan *Item
}
// Create a new layered cache with the specified configuration.
// A layered cache used a two keys to identify a value: a primary key
// and a secondary key. Get, Set and Delete require both a primary and
// secondary key. However, DeleteAll requires only a primary key, deleting
// all values that share the same primary key.
// Layered Cache is useful as an HTTP cache, where an HTTP purge might
// delete multiple variants of the same resource:
// primary key = "user/44"
// secondary key 1 = ".json"
// secondary key 2 = ".xml"
// See ccache.Configure() for creating a configuration
func Layered(config *Configuration) *LayeredCache {
c := &LayeredCache{
list: list.New(),
Configuration: config,
bucketMask: uint32(config.buckets) - 1,
buckets: make([]*layeredBucket, config.buckets),
deletables: make(chan *Item, config.deleteBuffer),
promotables: make(chan *Item, config.promoteBuffer),
}
for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &layeredBucket{
buckets: make(map[string]*bucket),
}
}
go c.worker()
return c
}
// Get an item from the cache. Returns nil if the item wasn't found.
// This can return an expired item. Use item.Expired() to see if the item
// is expired and item.TTL() to see how long until the item expires (which
// will be negative for an already expired item).
func (c *LayeredCache) Get(primary, secondary string) *Item {
item := c.bucket(primary).get(primary, secondary)
if item == nil {
return nil
}
if item.expires > time.Now().UnixNano() {
c.promote(item)
}
return item
}
// Get the secondary cache for a given primary key. This operation will
// never return nil. In the case where the primary key does not exist, a
// new, underlying, empty bucket will be created and returned.
func (c *LayeredCache) GetOrCreateSecondaryCache(primary string) *SecondaryCache {
primaryBkt := c.bucket(primary)
bkt := primaryBkt.getSecondaryBucket(primary)
primaryBkt.Lock()
if bkt == nil {
bkt = &bucket{lookup: make(map[string]*Item)}
primaryBkt.buckets[primary] = bkt
}
primaryBkt.Unlock()
return &SecondaryCache{
bucket: bkt,
pCache: c,
}
}
// Used when the cache was created with the Track() configuration option.
// Avoid otherwise
func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem {
item := c.Get(primary, secondary)
if item == nil {
return NilTracked
}
item.track()
return item
}
// Set the value in the cache for the specified duration
func (c *LayeredCache) Set(primary, secondary string, value interface{}, duration time.Duration) {
c.set(primary, secondary, value, duration)
}
// Replace the value if it exists, does not set if it doesn't.
// Returns true if the item existed an was replaced, false otherwise.
// Replace does not reset item's TTL nor does it alter its position in the LRU
func (c *LayeredCache) Replace(primary, secondary string, value interface{}) bool {
item := c.bucket(primary).get(primary, secondary)
if item == nil {
return false
}
c.Set(primary, secondary, value, item.TTL())
return true
}
// Attempts to get the value from the cache and calles fetch on a miss.
// If fetch returns an error, no value is cached and the error is returned back
// to the caller.
func (c *LayeredCache) Fetch(primary, secondary string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
item := c.Get(primary, secondary)
if item != nil {
return item, nil
}
value, err := fetch()
if err != nil {
return nil, err
}
return c.set(primary, secondary, value, duration), nil
}
// Remove the item from the cache, return true if the item was present, false otherwise.
func (c *LayeredCache) Delete(primary, secondary string) bool {
item := c.bucket(primary).delete(primary, secondary)
if item != nil {
c.deletables <- item
return true
}
return false
}
// Deletes all items that share the same primary key
func (c *LayeredCache) DeleteAll(primary string) bool {
return c.bucket(primary).deleteAll(primary, c.deletables)
}
//this isn't thread safe. It's meant to be called from non-concurrent tests
func (c *LayeredCache) Clear() {
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = list.New()
}
func (c *LayeredCache) set(primary, secondary string, value interface{}, duration time.Duration) *Item {
item, existing := c.bucket(primary).set(primary, secondary, value, duration)
if existing != nil {
c.deletables <- existing
}
c.promote(item)
return item
}
func (c *LayeredCache) bucket(key string) *layeredBucket {
h := fnv.New32a()
h.Write([]byte(key))
return c.buckets[h.Sum32()&c.bucketMask]
}
func (c *LayeredCache) promote(item *Item) {
c.promotables <- item
}
func (c *LayeredCache) worker() {
for {
select {
case item := <-c.promotables:
if c.doPromote(item) && c.size > c.maxSize {
c.gc()
}
case item := <-c.deletables:
if item.element == nil {
item.promotions = -2
} else {
c.size -= item.size
c.list.Remove(item.element)
}
}
}
}
func (c *LayeredCache) doPromote(item *Item) bool {
// deleted before it ever got promoted
if item.promotions == -2 {
return false
}
if item.element != nil { //not a new item
if item.shouldPromote(c.getsPerPromote) {
c.list.MoveToFront(item.element)
item.promotions = 0
}
return false
}
c.size += item.size
item.element = c.list.PushFront(item)
return true
}
func (c *LayeredCache) gc() {
element := c.list.Back()
for i := 0; i < c.itemsToPrune; i++ {
if element == nil {
return
}
prev := element.Prev()
item := element.Value.(*Item)
if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
c.bucket(item.group).delete(item.group, item.key)
c.size -= item.size
c.list.Remove(element)
item.promotions = -2
}
element = prev
}
}

View File

@ -1,19 +0,0 @@
Copyright (c) 2013 Karl Seguin.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -1,72 +0,0 @@
package ccache
import "time"
type SecondaryCache struct {
bucket *bucket
pCache *LayeredCache
}
// Get the secondary key.
// The semantics are the same as for LayeredCache.Get
func (s *SecondaryCache) Get(secondary string) *Item {
return s.bucket.get(secondary)
}
// Set the secondary key to a value.
// The semantics are the same as for LayeredCache.Set
func (s *SecondaryCache) Set(secondary string, value interface{}, duration time.Duration) *Item {
item, existing := s.bucket.set(secondary, value, duration)
if existing != nil {
s.pCache.deletables <- existing
}
s.pCache.promote(item)
return item
}
// Fetch or set a secondary key.
// The semantics are the same as for LayeredCache.Fetch
func (s *SecondaryCache) Fetch(secondary string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
item := s.Get(secondary)
if item != nil {
return item, nil
}
value, err := fetch()
if err != nil {
return nil, err
}
return s.Set(secondary, value, duration), nil
}
// Delete a secondary key.
// The semantics are the same as for LayeredCache.Delete
func (s *SecondaryCache) Delete(secondary string) bool {
item := s.bucket.delete(secondary)
if item != nil {
s.pCache.deletables <- item
return true
}
return false
}
// Replace a secondary key.
// The semantics are the same as for LayeredCache.Replace
func (s *SecondaryCache) Replace(secondary string, value interface{}) bool {
item := s.Get(secondary)
if item == nil {
return false
}
s.Set(secondary, value, item.TTL())
return true
}
// Track a secondary key.
// The semantics are the same as for LayeredCache.TrackingGet
func (c *SecondaryCache) TrackingGet(secondary string) TrackedItem {
item := c.Get(secondary)
if item == nil {
return NilTracked
}
item.track()
return item
}

6
glide.lock generated
View File

@ -38,6 +38,10 @@ imports:
version: 909568be09de550ed094403c2bf8a261b5bb730a
subpackages:
- proto
- name: github.com/golang/groupcache
version: 02826c3e79038b59d737d3b1c0a1d937f71a4433
subpackages:
- lru
- name: github.com/golang/protobuf
version: 4bd1920723d7b7c925de087aa32e2187708897f7
subpackages:
@ -57,8 +61,6 @@ imports:
version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75
- name: github.com/jonboulle/clockwork
version: 2eee05ed794112d45db504eb05aa693efd2b8b09
- name: github.com/karlseguin/ccache
version: a2d62155777b39595c825ed3824279e642a5db3c
- name: github.com/kr/pty
version: f7ee69f31298ecbe5d2b349c711e2547a617d398
- name: github.com/mattn/go-runewidth

View File

@ -28,6 +28,10 @@ import:
version: v0.3
subpackages:
- proto
- package: github.com/golang/groupcache
version: 02826c3e79038b59d737d3b1c0a1d937f71a4433
subpackages:
- lru
- package: github.com/golang/protobuf
version: 4bd1920723d7b7c925de087aa32e2187708897f7
subpackages:
@ -101,5 +105,3 @@ import:
version: 976c720a22c8eb4eb6a0b4348ad85ad12491a506
subpackages:
- assert
- package: github.com/karlseguin/ccache
version: v2.0.2

View File

@ -17,13 +17,11 @@ package cache
import (
"errors"
"sync"
"time"
"github.com/karlseguin/ccache"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/adt"
"github.com/golang/groupcache/lru"
)
var (
@ -31,14 +29,12 @@ var (
ErrCompacted = rpctypes.ErrGRPCCompacted
)
const defaultHistoricTTL = time.Hour
const defaultCurrentTTL = time.Minute
type Cache interface {
Add(req *pb.RangeRequest, resp *pb.RangeResponse)
Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
Compact(revision int64)
Invalidate(key []byte, endkey []byte)
Size() int
Close()
}
@ -54,17 +50,17 @@ func keyFunc(req *pb.RangeRequest) string {
func NewCache(maxCacheEntries int) Cache {
return &cache{
lru: ccache.New(ccache.Configure().MaxSize(int64(maxCacheEntries))),
lru: lru.New(maxCacheEntries),
compactedRev: -1,
}
}
func (c *cache) Close() { c.lru.Stop() }
func (c *cache) Close() {}
// cache implements Cache
type cache struct {
mu sync.RWMutex
lru *ccache.Cache
lru *lru.Cache
// a reverse index for cache invalidation
cachedRanges adt.IntervalTree
@ -80,11 +76,7 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
defer c.mu.Unlock()
if req.Revision > c.compactedRev {
if req.Revision == 0 {
c.lru.Set(key, resp, defaultCurrentTTL)
} else {
c.lru.Set(key, resp, defaultHistoricTTL)
}
c.lru.Add(key, resp)
}
// we do not need to invalidate a request with a revision specified.
// so we do not need to add it into the reverse index.
@ -116,16 +108,16 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) {
key := keyFunc(req)
c.mu.RLock()
defer c.mu.RUnlock()
c.mu.Lock()
defer c.mu.Unlock()
if req.Revision < c.compactedRev {
c.lru.Delete(key)
c.lru.Remove(key)
return nil, ErrCompacted
}
if item := c.lru.Get(key); item != nil {
return item.Value().(*pb.RangeResponse), nil
if resp, ok := c.lru.Get(key); ok {
return resp.(*pb.RangeResponse), nil
}
return nil, errors.New("not exist")
}
@ -149,7 +141,7 @@ func (c *cache) Invalidate(key, endkey []byte) {
for _, iv := range ivs {
keys := iv.Val.([]string)
for _, key := range keys {
c.lru.Delete(key)
c.lru.Remove(key)
}
}
// delete after removing all keys since it is destructive to 'ivs'
@ -166,3 +158,9 @@ func (c *cache) Compact(revision int64) {
c.compactedRev = revision
}
}
func (c *cache) Size() int {
c.mu.RLock()
defer c.mu.RUnlock()
return c.lru.Len()
}

View File

@ -65,12 +65,14 @@ func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRespo
req.Serializable = true
gresp := (*pb.RangeResponse)(resp.Get())
p.cache.Add(&req, gresp)
cacheKeys.Set(float64(p.cache.Size()))
return gresp, nil
}
func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
p.cache.Invalidate(r.Key, nil)
cacheKeys.Set(float64(p.cache.Size()))
resp, err := p.kv.Do(ctx, PutRequestToOp(r))
return (*pb.PutResponse)(resp.Put()), err
@ -78,6 +80,7 @@ func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, e
func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
p.cache.Invalidate(r.Key, r.RangeEnd)
cacheKeys.Set(float64(p.cache.Size()))
resp, err := p.kv.Do(ctx, DelRequestToOp(r))
return (*pb.DeleteRangeResponse)(resp.Del()), err
@ -133,6 +136,8 @@ func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, e
p.txnToCache(r.Failure, resp.Responses)
}
cacheKeys.Set(float64(p.cache.Size()))
return (*pb.TxnResponse)(resp), nil
}
@ -147,6 +152,8 @@ func (p *kvProxy) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.Com
p.cache.Compact(r.Revision)
}
cacheKeys.Set(float64(p.cache.Size()))
return (*pb.CompactionResponse)(resp), err
}

View File

@ -29,6 +29,12 @@ var (
Name: "events_coalescing_total",
Help: "Total number of events coalescing",
})
cacheKeys = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "grpc_proxy",
Name: "cache_keys_total",
Help: "Total number of keys/ranges cached",
})
cacheHits = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "grpc_proxy",
@ -46,6 +52,7 @@ var (
func init() {
prometheus.MustRegister(watchersCoalescing)
prometheus.MustRegister(eventsCoalescing)
prometheus.MustRegister(cacheKeys)
prometheus.MustRegister(cacheHits)
prometheus.MustRegister(cachedMisses)
}