mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
store: switch to fake clock
This commit is contained in:
parent
47c2421f7b
commit
3134658ded
@ -5,6 +5,7 @@ import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
)
|
||||
|
||||
@ -119,7 +120,7 @@ func (n *node) Write(value string, index uint64) *etcdErr.Error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *node) ExpirationAndTTL() (*time.Time, int64) {
|
||||
func (n *node) expirationAndTTL(clock clockwork.Clock) (*time.Time, int64) {
|
||||
if !n.IsPermanent() {
|
||||
/* compute ttl as:
|
||||
ceiling( (expireTime - timeNow) / nanosecondsPerSecond )
|
||||
@ -128,7 +129,7 @@ func (n *node) ExpirationAndTTL() (*time.Time, int64) {
|
||||
( (expireTime - timeNow) / nanosecondsPerSecond ) + 1
|
||||
which ranges 1..n+1
|
||||
*/
|
||||
ttlN := n.ExpireTime.Sub(time.Now())
|
||||
ttlN := n.ExpireTime.Sub(clock.Now())
|
||||
ttl := ttlN / time.Second
|
||||
if (ttlN % time.Second) > 0 {
|
||||
ttl++
|
||||
@ -251,7 +252,7 @@ func (n *node) Remove(dir, recursive bool, callback func(path string)) *etcdErr.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *node) Repr(recursive, sorted bool) *NodeExtern {
|
||||
func (n *node) Repr(recursive, sorted bool, clock clockwork.Clock) *NodeExtern {
|
||||
if n.IsDir() {
|
||||
node := &NodeExtern{
|
||||
Key: n.Path,
|
||||
@ -259,7 +260,7 @@ func (n *node) Repr(recursive, sorted bool) *NodeExtern {
|
||||
ModifiedIndex: n.ModifiedIndex,
|
||||
CreatedIndex: n.CreatedIndex,
|
||||
}
|
||||
node.Expiration, node.TTL = n.ExpirationAndTTL()
|
||||
node.Expiration, node.TTL = n.expirationAndTTL(clock)
|
||||
|
||||
if !recursive {
|
||||
return node
|
||||
@ -278,7 +279,7 @@ func (n *node) Repr(recursive, sorted bool) *NodeExtern {
|
||||
continue
|
||||
}
|
||||
|
||||
node.Nodes[i] = child.Repr(recursive, sorted)
|
||||
node.Nodes[i] = child.Repr(recursive, sorted, clock)
|
||||
|
||||
i++
|
||||
}
|
||||
@ -300,7 +301,7 @@ func (n *node) Repr(recursive, sorted bool) *NodeExtern {
|
||||
ModifiedIndex: n.ModifiedIndex,
|
||||
CreatedIndex: n.CreatedIndex,
|
||||
}
|
||||
node.Expiration, node.TTL = n.ExpirationAndTTL()
|
||||
node.Expiration, node.TTL = n.expirationAndTTL(clock)
|
||||
return node
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,8 @@ package store
|
||||
import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
|
||||
)
|
||||
|
||||
// NodeExtern is the external representation of the
|
||||
@ -20,7 +22,7 @@ type NodeExtern struct {
|
||||
CreatedIndex uint64 `json:"createdIndex,omitempty"`
|
||||
}
|
||||
|
||||
func (eNode *NodeExtern) loadInternalNode(n *node, recursive, sorted bool) {
|
||||
func (eNode *NodeExtern) loadInternalNode(n *node, recursive, sorted bool, clock clockwork.Clock) {
|
||||
if n.IsDir() { // node is a directory
|
||||
eNode.Dir = true
|
||||
|
||||
@ -36,7 +38,7 @@ func (eNode *NodeExtern) loadInternalNode(n *node, recursive, sorted bool) {
|
||||
continue
|
||||
}
|
||||
|
||||
eNode.Nodes[i] = child.Repr(recursive, sorted)
|
||||
eNode.Nodes[i] = child.Repr(recursive, sorted, clock)
|
||||
i++
|
||||
}
|
||||
|
||||
@ -52,7 +54,7 @@ func (eNode *NodeExtern) loadInternalNode(n *node, recursive, sorted bool) {
|
||||
eNode.Value = &value
|
||||
}
|
||||
|
||||
eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
|
||||
eNode.Expiration, eNode.TTL = n.expirationAndTTL(clock)
|
||||
}
|
||||
|
||||
type NodeExterns []*NodeExtern
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
@ -87,15 +88,12 @@ func TestStoreStatsDeleteFail(t *testing.T) {
|
||||
//Ensure that the number of expirations is recorded in the stats.
|
||||
func TestStoreStatsExpireCount(t *testing.T) {
|
||||
s := newStore()
|
||||
fc := clockwork.NewFakeClock()
|
||||
s.clock = fc
|
||||
|
||||
c := make(chan bool)
|
||||
defer func() {
|
||||
c <- true
|
||||
}()
|
||||
|
||||
go mockSyncService(s.DeleteExpiredKeys, c)
|
||||
s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
|
||||
s.Create("/foo", false, "bar", false, fc.Now().Add(500*time.Millisecond))
|
||||
assert.Equal(t, uint64(0), s.Stats.ExpireCount, "")
|
||||
time.Sleep(600 * time.Millisecond)
|
||||
fc.Tick(600 * time.Millisecond)
|
||||
s.DeleteExpiredKeys(fc.Now())
|
||||
assert.Equal(t, uint64(1), s.Stats.ExpireCount, "")
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
)
|
||||
|
||||
@ -68,10 +69,13 @@ type store struct {
|
||||
CurrentVersion int
|
||||
ttlKeyHeap *ttlKeyHeap // need to recovery manually
|
||||
worldLock sync.RWMutex // stop the world lock
|
||||
clock clockwork.Clock
|
||||
}
|
||||
|
||||
func New() Store {
|
||||
return newStore()
|
||||
s := newStore()
|
||||
s.clock = clockwork.NewRealClock()
|
||||
return s
|
||||
}
|
||||
|
||||
func newStore() *store {
|
||||
@ -114,7 +118,7 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
|
||||
|
||||
e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
|
||||
e.EtcdIndex = s.CurrentIndex
|
||||
e.Node.loadInternalNode(n, recursive, sorted)
|
||||
e.Node.loadInternalNode(n, recursive, sorted, s.clock)
|
||||
|
||||
s.Stats.Inc(GetSuccess)
|
||||
|
||||
@ -172,7 +176,7 @@ func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Tim
|
||||
// Put prevNode into event
|
||||
if getErr == nil {
|
||||
prev := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
|
||||
prev.Node.loadInternalNode(n, false, false)
|
||||
prev.Node.loadInternalNode(n, false, false, s.clock)
|
||||
e.PrevNode = prev.Node
|
||||
}
|
||||
|
||||
@ -230,7 +234,7 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
|
||||
|
||||
e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
|
||||
e.EtcdIndex = s.CurrentIndex
|
||||
e.PrevNode = n.Repr(false, false)
|
||||
e.PrevNode = n.Repr(false, false, s.clock)
|
||||
eNode := e.Node
|
||||
|
||||
// if test succeed, write the value
|
||||
@ -240,7 +244,7 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
|
||||
// copy the value for safety
|
||||
valueCopy := value
|
||||
eNode.Value = &valueCopy
|
||||
eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
|
||||
eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
|
||||
|
||||
s.WatcherHub.notify(e)
|
||||
s.Stats.Inc(CompareAndSwapSuccess)
|
||||
@ -275,7 +279,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
||||
nextIndex := s.CurrentIndex + 1
|
||||
e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
|
||||
e.EtcdIndex = nextIndex
|
||||
e.PrevNode = n.Repr(false, false)
|
||||
e.PrevNode = n.Repr(false, false, s.clock)
|
||||
eNode := e.Node
|
||||
|
||||
if n.IsDir() {
|
||||
@ -335,7 +339,7 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
|
||||
|
||||
e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
|
||||
e.EtcdIndex = s.CurrentIndex
|
||||
e.PrevNode = n.Repr(false, false)
|
||||
e.PrevNode = n.Repr(false, false, s.clock)
|
||||
|
||||
callback := func(path string) { // notify function
|
||||
// notify the watchers with deleted set true
|
||||
@ -414,7 +418,7 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
|
||||
|
||||
e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
|
||||
e.EtcdIndex = nextIndex
|
||||
e.PrevNode = n.Repr(false, false)
|
||||
e.PrevNode = n.Repr(false, false, s.clock)
|
||||
eNode := e.Node
|
||||
|
||||
if n.IsDir() && len(newValue) != 0 {
|
||||
@ -436,7 +440,7 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
|
||||
// update ttl
|
||||
n.UpdateTTL(expireTime)
|
||||
|
||||
eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
|
||||
eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
|
||||
|
||||
s.WatcherHub.notify(e)
|
||||
|
||||
@ -463,12 +467,6 @@ func (s *store) internalCreate(nodePath string, dir bool, value string, unique,
|
||||
return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
|
||||
}
|
||||
|
||||
// Assume expire times that are way in the past are not valid.
|
||||
// This can occur when the time is serialized to JSON and read back in.
|
||||
if expireTime.Before(minExpireTime) {
|
||||
expireTime = Permanent
|
||||
}
|
||||
|
||||
dirName, nodeName := path.Split(nodePath)
|
||||
|
||||
// walk through the nodePath, create dirs and get the last directory node
|
||||
@ -491,7 +489,7 @@ func (s *store) internalCreate(nodePath string, dir bool, value string, unique,
|
||||
if n.IsDir() {
|
||||
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
|
||||
}
|
||||
e.PrevNode = n.Repr(false, false)
|
||||
e.PrevNode = n.Repr(false, false, s.clock)
|
||||
|
||||
n.Remove(false, false, nil)
|
||||
} else {
|
||||
@ -519,7 +517,7 @@ func (s *store) internalCreate(nodePath string, dir bool, value string, unique,
|
||||
if !n.IsPermanent() {
|
||||
s.ttlKeyHeap.push(n)
|
||||
|
||||
eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
|
||||
eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
|
||||
}
|
||||
|
||||
s.CurrentIndex = nextIndex
|
||||
@ -568,7 +566,7 @@ func (s *store) DeleteExpiredKeys(cutoff time.Time) {
|
||||
s.CurrentIndex++
|
||||
e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
|
||||
e.EtcdIndex = s.CurrentIndex
|
||||
e.PrevNode = node.Repr(false, false)
|
||||
e.PrevNode = node.Repr(false, false, s.clock)
|
||||
|
||||
callback := func(path string) { // notify function
|
||||
// notify the watchers with deleted set true
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/stretchr/testify/assert"
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
)
|
||||
@ -41,13 +42,15 @@ func TestStoreGetValue(t *testing.T) {
|
||||
// Note that hidden files should not be returned.
|
||||
func TestStoreGetDirectory(t *testing.T) {
|
||||
s := newStore()
|
||||
fc := clockwork.NewFakeClock()
|
||||
s.clock = fc
|
||||
s.Create("/foo", true, "", false, Permanent)
|
||||
s.Create("/foo/bar", false, "X", false, Permanent)
|
||||
s.Create("/foo/_hidden", false, "*", false, Permanent)
|
||||
s.Create("/foo/baz", true, "", false, Permanent)
|
||||
s.Create("/foo/baz/bat", false, "Y", false, Permanent)
|
||||
s.Create("/foo/baz/_hidden", false, "*", false, Permanent)
|
||||
s.Create("/foo/baz/ttl", false, "Y", false, time.Now().Add(time.Second*3))
|
||||
s.Create("/foo/baz/ttl", false, "Y", false, fc.Now().Add(time.Second*3))
|
||||
var eidx uint64 = 7
|
||||
e, err := s.Get("/foo", true, false)
|
||||
assert.Nil(t, err, "")
|
||||
@ -311,21 +314,17 @@ func TestStoreUpdateFailsIfDirectory(t *testing.T) {
|
||||
// Ensure that the store can update the TTL on a value.
|
||||
func TestStoreUpdateValueTTL(t *testing.T) {
|
||||
s := newStore()
|
||||
|
||||
c := make(chan bool)
|
||||
defer func() {
|
||||
c <- true
|
||||
}()
|
||||
go mockSyncService(s.DeleteExpiredKeys, c)
|
||||
fc := clockwork.NewFakeClock()
|
||||
s.clock = fc
|
||||
|
||||
var eidx uint64 = 2
|
||||
s.Create("/foo", false, "bar", false, Permanent)
|
||||
_, err := s.Update("/foo", "baz", time.Now().Add(500*time.Millisecond))
|
||||
_, err := s.Update("/foo", "baz", fc.Now().Add(500*time.Millisecond))
|
||||
e, _ := s.Get("/foo", false, false)
|
||||
assert.Equal(t, *e.Node.Value, "baz", "")
|
||||
assert.Equal(t, e.EtcdIndex, eidx, "")
|
||||
|
||||
time.Sleep(600 * time.Millisecond)
|
||||
fc.Tick(600 * time.Millisecond)
|
||||
s.DeleteExpiredKeys(fc.Now())
|
||||
e, err = s.Get("/foo", false, false)
|
||||
assert.Nil(t, e, "")
|
||||
assert.Equal(t, err.(*etcdErr.Error).ErrorCode, etcdErr.EcodeKeyNotFound, "")
|
||||
@ -334,24 +333,21 @@ func TestStoreUpdateValueTTL(t *testing.T) {
|
||||
// Ensure that the store can update the TTL on a directory.
|
||||
func TestStoreUpdateDirTTL(t *testing.T) {
|
||||
s := newStore()
|
||||
|
||||
c := make(chan bool)
|
||||
defer func() {
|
||||
c <- true
|
||||
}()
|
||||
go mockSyncService(s.DeleteExpiredKeys, c)
|
||||
fc := clockwork.NewFakeClock()
|
||||
s.clock = fc
|
||||
|
||||
var eidx uint64 = 3
|
||||
s.Create("/foo", true, "", false, Permanent)
|
||||
s.Create("/foo/bar", false, "baz", false, Permanent)
|
||||
e, err := s.Update("/foo", "", time.Now().Add(500*time.Millisecond))
|
||||
e, err := s.Update("/foo", "", fc.Now().Add(500*time.Millisecond))
|
||||
assert.Equal(t, e.Node.Dir, true, "")
|
||||
assert.Equal(t, e.EtcdIndex, eidx, "")
|
||||
e, _ = s.Get("/foo/bar", false, false)
|
||||
assert.Equal(t, *e.Node.Value, "baz", "")
|
||||
assert.Equal(t, e.EtcdIndex, eidx, "")
|
||||
|
||||
time.Sleep(600 * time.Millisecond)
|
||||
fc.Tick(600 * time.Millisecond)
|
||||
s.DeleteExpiredKeys(fc.Now())
|
||||
e, err = s.Get("/foo/bar", false, false)
|
||||
assert.Nil(t, e, "")
|
||||
assert.Equal(t, err.(*etcdErr.Error).ErrorCode, etcdErr.EcodeKeyNotFound, "")
|
||||
@ -707,23 +703,20 @@ func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
|
||||
// Ensure that the store can watch for key expiration.
|
||||
func TestStoreWatchExpire(t *testing.T) {
|
||||
s := newStore()
|
||||
|
||||
stopChan := make(chan bool)
|
||||
defer func() {
|
||||
stopChan <- true
|
||||
}()
|
||||
go mockSyncService(s.DeleteExpiredKeys, stopChan)
|
||||
fc := clockwork.NewFakeClock()
|
||||
s.clock = fc
|
||||
|
||||
var eidx uint64 = 2
|
||||
s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
|
||||
s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond))
|
||||
s.Create("/foo", false, "bar", false, fc.Now().Add(500*time.Millisecond))
|
||||
s.Create("/foofoo", false, "barbarbar", false, fc.Now().Add(500*time.Millisecond))
|
||||
|
||||
w, _ := s.Watch("/", true, false, 0)
|
||||
assert.Equal(t, w.StartIndex(), eidx, "")
|
||||
c := w.EventChan()
|
||||
e := nbselect(c)
|
||||
assert.Nil(t, e, "")
|
||||
time.Sleep(600 * time.Millisecond)
|
||||
fc.Tick(600 * time.Millisecond)
|
||||
s.DeleteExpiredKeys(fc.Now())
|
||||
eidx = 3
|
||||
e = nbselect(c)
|
||||
assert.Equal(t, e.EtcdIndex, eidx, "")
|
||||
@ -790,32 +783,25 @@ func TestStoreRecover(t *testing.T) {
|
||||
// Ensure that the store can recover from a previously saved state that includes an expiring key.
|
||||
func TestStoreRecoverWithExpiration(t *testing.T) {
|
||||
s := newStore()
|
||||
s.clock = clockwork.NewFakeClock()
|
||||
|
||||
c := make(chan bool)
|
||||
defer func() {
|
||||
c <- true
|
||||
}()
|
||||
go mockSyncService(s.DeleteExpiredKeys, c)
|
||||
fc := clockwork.NewFakeClock()
|
||||
|
||||
var eidx uint64 = 4
|
||||
s.Create("/foo", true, "", false, Permanent)
|
||||
s.Create("/foo/x", false, "bar", false, Permanent)
|
||||
s.Create("/foo/y", false, "baz", false, time.Now().Add(5*time.Millisecond))
|
||||
s.Create("/foo/y", false, "baz", false, fc.Now().Add(5*time.Millisecond))
|
||||
b, err := s.Save()
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
s2 := newStore()
|
||||
|
||||
c2 := make(chan bool)
|
||||
defer func() {
|
||||
c2 <- true
|
||||
}()
|
||||
go mockSyncService(s2.DeleteExpiredKeys, c2)
|
||||
s2.clock = fc
|
||||
|
||||
s2.Recovery(b)
|
||||
|
||||
time.Sleep(600 * time.Millisecond)
|
||||
fc.Tick(600 * time.Millisecond)
|
||||
s.DeleteExpiredKeys(fc.Now())
|
||||
|
||||
e, err := s.Get("/foo/x", false, false)
|
||||
assert.Nil(t, err, "")
|
||||
@ -908,24 +894,22 @@ func TestStoreWatchRecursiveDeleteWithHiddenKey(t *testing.T) {
|
||||
// Ensure that the store doesn't see expirations of hidden keys.
|
||||
func TestStoreWatchExpireWithHiddenKey(t *testing.T) {
|
||||
s := newStore()
|
||||
fc := clockwork.NewFakeClock()
|
||||
s.clock = fc
|
||||
|
||||
stopChan := make(chan bool)
|
||||
defer func() {
|
||||
stopChan <- true
|
||||
}()
|
||||
go mockSyncService(s.DeleteExpiredKeys, stopChan)
|
||||
|
||||
s.Create("/_foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
|
||||
s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(1000*time.Millisecond))
|
||||
s.Create("/_foo", false, "bar", false, fc.Now().Add(500*time.Millisecond))
|
||||
s.Create("/foofoo", false, "barbarbar", false, fc.Now().Add(1000*time.Millisecond))
|
||||
|
||||
w, _ := s.Watch("/", true, false, 0)
|
||||
c := w.EventChan()
|
||||
e := nbselect(c)
|
||||
assert.Nil(t, e, "")
|
||||
time.Sleep(600 * time.Millisecond)
|
||||
fc.Tick(600 * time.Millisecond)
|
||||
s.DeleteExpiredKeys(fc.Now())
|
||||
e = nbselect(c)
|
||||
assert.Nil(t, e, "")
|
||||
time.Sleep(600 * time.Millisecond)
|
||||
fc.Tick(600 * time.Millisecond)
|
||||
s.DeleteExpiredKeys(fc.Now())
|
||||
e = nbselect(c)
|
||||
assert.Equal(t, e.Action, "expire", "")
|
||||
assert.Equal(t, e.Node.Key, "/foofoo", "")
|
||||
@ -969,15 +953,3 @@ func nbselect(c <-chan *Event) *Event {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func mockSyncService(f func(now time.Time), c chan bool) {
|
||||
ticker := time.Tick(time.Millisecond * 500)
|
||||
for {
|
||||
select {
|
||||
case <-c:
|
||||
return
|
||||
case now := <-ticker:
|
||||
f(now)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user