Merge pull request #4164 from cchamplin/ttl-refresh

store/httpapi: support refresh ttl without firing watch
This commit is contained in:
Xiang Li 2016-02-08 13:20:34 -08:00
commit 0fde354eba
16 changed files with 452 additions and 160 deletions

View File

@ -234,6 +234,50 @@ curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl= -d prevExist=t
}
```
### Refreshing key TTL
Keys in etcd can be refreshed notifying watchers
this can be achieved by setting the refresh to true when updating a TTL
You cannot update the value of a key when refreshing it
```sh
curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl=5
curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d ttl=5 -d refresh=true -d prevExist=true
```
```json
{
"action": "set",
"node": {
"createdIndex": 5,
"expiration": "2013-12-04T12:01:21.874888581-08:00",
"key": "/foo",
"modifiedIndex": 5,
"ttl": 5,
"value": "bar"
}
}
{
"action":"update",
"node":{
"key":"/foo",
"value":"bar",
"expiration": "2013-12-04T12:01:26.874888581-08:00",
"ttl":5,
"modifiedIndex":6,
"createdIndex":5
},
"prevNode":{
"key":"/foo",
"value":"bar",
"expiration":"2013-12-04T12:01:21.874888581-08:00",
"ttl":3,
"modifiedIndex":5,
"createdIndex":5
}
}
```
### Waiting for a change

View File

@ -184,6 +184,11 @@ type SetOptions struct {
// a TTL of 0.
TTL time.Duration
// When refresh is set to true a TTL value can be updated
// without firing a watch or changing the node value. A
// value must not provided when refreshing a key.
Refresh bool
// Dir specifies whether or not this Node should be created as a directory.
Dir bool
}
@ -327,6 +332,7 @@ func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions
act.PrevIndex = opts.PrevIndex
act.PrevExist = opts.PrevExist
act.TTL = opts.TTL
act.Refresh = opts.Refresh
act.Dir = opts.Dir
}
@ -518,6 +524,7 @@ type setAction struct {
PrevIndex uint64
PrevExist PrevExistType
TTL time.Duration
Refresh bool
Dir bool
}
@ -549,6 +556,10 @@ func (a *setAction) HTTPRequest(ep url.URL) *http.Request {
form.Add("ttl", strconv.FormatUint(uint64(a.TTL.Seconds()), 10))
}
if a.Refresh {
form.Add("refresh", "true")
}
u.RawQuery = params.Encode()
body := strings.NewReader(form.Encode())

View File

@ -356,6 +356,18 @@ func TestSetAction(t *testing.T) {
wantURL: "http://example.com/foo",
wantBody: "ttl=180&value=",
},
// Refresh is set
{
act: setAction{
Key: "foo",
TTL: 3 * time.Minute,
Refresh: true,
},
wantURL: "http://example.com/foo",
wantBody: "refresh=true&ttl=180&value=",
},
// Dir is set
{
act: setAction{

View File

@ -48,6 +48,8 @@ var errors = map[int]string{
ecodeIndexValueMutex: "Index and value cannot both be specified",
EcodeInvalidField: "Invalid field",
EcodeInvalidForm: "Invalid POST form",
EcodeRefreshValue: "Value provided on refresh",
EcodeRefreshTTLRequired: "A TTL must be provided on refresh",
// raft related errors
EcodeRaftInternal: "Raft Internal Error",
@ -99,6 +101,8 @@ const (
ecodeIndexValueMutex = 208
EcodeInvalidField = 209
EcodeInvalidForm = 210
EcodeRefreshValue = 211
EcodeRefreshTTLRequired = 212
EcodeRaftInternal = 300
EcodeLeaderElect = 301

View File

@ -306,7 +306,7 @@ func (c *cluster) AddMember(m *Member) {
plog.Panicf("marshal raftAttributes should never fail: %v", err)
}
p := path.Join(memberStoreKey(m.ID), raftAttributesSuffix)
if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil {
if _, err := c.store.Create(p, false, string(b), false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("create raftAttributes should never fail: %v", err)
}
c.members[m.ID] = m
@ -321,7 +321,7 @@ func (c *cluster) RemoveMember(id types.ID) {
plog.Panicf("delete member should never fail: %v", err)
}
delete(c.members, id)
if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("create removedMember should never fail: %v", err)
}
c.removed[id] = true
@ -352,7 +352,7 @@ func (c *cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
plog.Panicf("marshal raftAttributes should never fail: %v", err)
}
p := path.Join(memberStoreKey(id), raftAttributesSuffix)
if _, err := c.store.Update(p, string(b), store.Permanent); err != nil {
if _, err := c.store.Update(p, string(b), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("update raftAttributes should never fail: %v", err)
}
c.members[id].RaftAttributes = raftAttr

View File

@ -460,7 +460,7 @@ func TestClusterAddMember(t *testing.T) {
false,
`{"peerURLs":null}`,
false,
store.Permanent,
store.TTLOptionSet{ExpireTime: store.Permanent},
},
},
}
@ -499,7 +499,7 @@ func TestClusterRemoveMember(t *testing.T) {
wactions := []testutil.Action{
{Name: "Delete", Params: []interface{}{memberStoreKey(1), true, true}},
{Name: "Create", Params: []interface{}{removedMemberStoreKey(1), false, "", false, store.Permanent}},
{Name: "Create", Params: []interface{}{removedMemberStoreKey(1), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}}},
}
if !reflect.DeepEqual(st.Action(), wactions) {
t.Errorf("actions = %v, want %v", st.Action(), wactions)

View File

@ -558,6 +558,34 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque
pe = &bv
}
// refresh is nullable, so leave it null if not specified
var refresh *bool
if _, ok := r.Form["refresh"]; ok {
bv, err := getBool(r.Form, "refresh")
if err != nil {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
"invalid value for refresh",
)
}
refresh = &bv
if refresh != nil && *refresh {
val := r.FormValue("value")
if _, ok := r.Form["value"]; ok && val != "" {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeRefreshValue,
`A value was provided on a refresh`,
)
}
if ttl == nil {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeRefreshTTLRequired,
`No TTL value set`,
)
}
}
}
rr := etcdserverpb.Request{
Method: r.Method,
Path: p,
@ -578,6 +606,10 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque
rr.PrevExist = pe
}
if refresh != nil {
rr.Refresh = refresh
}
// Null TTL is equivalent to unset Expiration
if ttl != nil {
expr := time.Duration(*ttl) * time.Second

View File

@ -172,6 +172,49 @@ func (w *dummyWatcher) EventChan() chan *store.Event {
func (w *dummyWatcher) StartIndex() uint64 { return w.sidx }
func (w *dummyWatcher) Remove() {}
func TestBadRefreshRequest(t *testing.T) {
tests := []struct {
in *http.Request
wcode int
}{
{
mustNewRequest(t, "foo?refresh=true&value=test"),
etcdErr.EcodeRefreshValue,
},
{
mustNewRequest(t, "foo?refresh=true&value=10"),
etcdErr.EcodeRefreshValue,
},
{
mustNewRequest(t, "foo?refresh=true"),
etcdErr.EcodeRefreshTTLRequired,
},
{
mustNewRequest(t, "foo?refresh=true&ttl="),
etcdErr.EcodeRefreshTTLRequired,
},
}
for i, tt := range tests {
got, err := parseKeyRequest(tt.in, clockwork.NewFakeClock())
if err == nil {
t.Errorf("#%d: unexpected nil error!", i)
continue
}
ee, ok := err.(*etcdErr.Error)
if !ok {
t.Errorf("#%d: err is not etcd.Error!", i)
continue
}
if ee.ErrorCode != tt.wcode {
t.Errorf("#%d: code=%d, want %v", i, ee.ErrorCode, tt.wcode)
t.Logf("cause: %#v", ee.Cause)
}
if !reflect.DeepEqual(got, etcdserverpb.Request{}) {
t.Errorf("#%d: unexpected non-empty Request: %#v", i, got)
}
}
}
func TestBadParseRequest(t *testing.T) {
tests := []struct {
in *http.Request

View File

@ -85,6 +85,7 @@ type Request struct {
Quorum bool `protobuf:"varint,14,opt,name=Quorum" json:"Quorum"`
Time int64 `protobuf:"varint,15,opt,name=Time" json:"Time"`
Stream bool `protobuf:"varint,16,opt,name=Stream" json:"Stream"`
Refresh *bool `protobuf:"varint,17,opt,name=Refresh" json:"Refresh,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -212,6 +213,18 @@ func (m *Request) MarshalTo(data []byte) (int, error) {
data[i] = 0
}
i++
if m.Refresh != nil {
data[i] = 0x88
i++
data[i] = 0x1
i++
if *m.Refresh {
data[i] = 1
} else {
data[i] = 0
}
i++
}
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}
@ -297,6 +310,9 @@ func (m *Request) Size() (n int) {
n += 2
n += 1 + sovEtcdserver(uint64(m.Time))
n += 3
if m.Refresh != nil {
n += 3
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@ -708,6 +724,27 @@ func (m *Request) Unmarshal(data []byte) error {
}
}
m.Stream = bool(v != 0)
case 17:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Refresh", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
v |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
b := bool(v != 0)
m.Refresh = &b
default:
iNdEx = preIndex
skippy, err := skipEtcdserver(data[iNdEx:])

View File

@ -25,6 +25,7 @@ message Request {
optional bool Quorum = 14 [(gogoproto.nullable) = false];
optional int64 Time = 15 [(gogoproto.nullable) = false];
optional bool Stream = 16 [(gogoproto.nullable) = false];
optional bool Refresh = 17 [(gogoproto.nullable) = true];
}
message Metadata {

View File

@ -1049,23 +1049,25 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
return Response{Event: ev, err: err}
}
expr := timeutil.UnixNanoToTime(r.Expiration)
refresh, _ := pbutil.GetBool(r.Refresh)
ttlOptions := store.TTLOptionSet{ExpireTime: expr, Refresh: refresh}
switch r.Method {
case "POST":
return f(s.store.Create(r.Path, r.Dir, r.Val, true, expr))
return f(s.store.Create(r.Path, r.Dir, r.Val, true, ttlOptions))
case "PUT":
exists, existsSet := pbutil.GetBool(r.PrevExist)
switch {
case existsSet:
if exists {
if r.PrevIndex == 0 && r.PrevValue == "" {
return f(s.store.Update(r.Path, r.Val, expr))
return f(s.store.Update(r.Path, r.Val, ttlOptions))
} else {
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
}
}
return f(s.store.Create(r.Path, r.Dir, r.Val, false, expr))
return f(s.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
case r.PrevIndex > 0 || r.PrevValue != "":
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
default:
// TODO (yicheng): cluster should be the owner of cluster prefix store
// we should not modify cluster store here.
@ -1083,7 +1085,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
if r.Path == path.Join(StoreClusterPrefix, "version") {
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
}
return f(s.store.Set(r.Path, r.Dir, r.Val, expr))
return f(s.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
}
case "DELETE":
switch {

View File

@ -229,7 +229,7 @@ func TestApplyRequest(t *testing.T) {
[]testutil.Action{
{
Name: "Create",
Params: []interface{}{"", false, "", true, time.Time{}},
Params: []interface{}{"", false, "", true, store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
@ -240,7 +240,7 @@ func TestApplyRequest(t *testing.T) {
[]testutil.Action{
{
Name: "Create",
Params: []interface{}{"", false, "", true, time.Unix(0, 1337)},
Params: []interface{}{"", false, "", true, store.TTLOptionSet{ExpireTime: time.Unix(0, 1337)}},
},
},
},
@ -251,7 +251,7 @@ func TestApplyRequest(t *testing.T) {
[]testutil.Action{
{
Name: "Create",
Params: []interface{}{"", true, "", true, time.Time{}},
Params: []interface{}{"", true, "", true, store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
@ -262,7 +262,7 @@ func TestApplyRequest(t *testing.T) {
[]testutil.Action{
{
Name: "Set",
Params: []interface{}{"", false, "", time.Time{}},
Params: []interface{}{"", false, "", store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
@ -273,7 +273,7 @@ func TestApplyRequest(t *testing.T) {
[]testutil.Action{
{
Name: "Set",
Params: []interface{}{"", true, "", time.Time{}},
Params: []interface{}{"", true, "", store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
@ -284,7 +284,7 @@ func TestApplyRequest(t *testing.T) {
[]testutil.Action{
{
Name: "Update",
Params: []interface{}{"", "", time.Time{}},
Params: []interface{}{"", "", store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
@ -295,7 +295,7 @@ func TestApplyRequest(t *testing.T) {
[]testutil.Action{
{
Name: "Create",
Params: []interface{}{"", false, "", false, time.Time{}},
Params: []interface{}{"", false, "", false, store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
@ -306,7 +306,7 @@ func TestApplyRequest(t *testing.T) {
[]testutil.Action{
{
Name: "CompareAndSwap",
Params: []interface{}{"", "", uint64(1), "", time.Time{}},
Params: []interface{}{"", "", uint64(1), "", store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
@ -317,7 +317,7 @@ func TestApplyRequest(t *testing.T) {
[]testutil.Action{
{
Name: "Create",
Params: []interface{}{"", false, "", false, time.Time{}},
Params: []interface{}{"", false, "", false, store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
@ -328,7 +328,7 @@ func TestApplyRequest(t *testing.T) {
[]testutil.Action{
{
Name: "CompareAndSwap",
Params: []interface{}{"", "", uint64(1), "", time.Time{}},
Params: []interface{}{"", "", uint64(1), "", store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
@ -339,7 +339,7 @@ func TestApplyRequest(t *testing.T) {
[]testutil.Action{
{
Name: "CompareAndSwap",
Params: []interface{}{"", "bar", uint64(0), "", time.Time{}},
Params: []interface{}{"", "bar", uint64(0), "", store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
@ -350,7 +350,7 @@ func TestApplyRequest(t *testing.T) {
[]testutil.Action{
{
Name: "CompareAndSwap",
Params: []interface{}{"", "bar", uint64(1), "", time.Time{}},
Params: []interface{}{"", "bar", uint64(1), "", store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},

View File

@ -24,7 +24,7 @@ import (
// Ensure that a successful Get is recorded in the stats.
func TestStoreStatsGetSuccess(t *testing.T) {
s := newStore()
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
s.Get("/foo", false, false)
assert.Equal(t, uint64(1), s.Stats.GetSuccess, "")
}
@ -32,7 +32,7 @@ func TestStoreStatsGetSuccess(t *testing.T) {
// Ensure that a failed Get is recorded in the stats.
func TestStoreStatsGetFail(t *testing.T) {
s := newStore()
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
s.Get("/no_such_key", false, false)
assert.Equal(t, uint64(1), s.Stats.GetFail, "")
}
@ -40,53 +40,53 @@ func TestStoreStatsGetFail(t *testing.T) {
// Ensure that a successful Create is recorded in the stats.
func TestStoreStatsCreateSuccess(t *testing.T) {
s := newStore()
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
assert.Equal(t, uint64(1), s.Stats.CreateSuccess, "")
}
// Ensure that a failed Create is recorded in the stats.
func TestStoreStatsCreateFail(t *testing.T) {
s := newStore()
s.Create("/foo", true, "", false, Permanent)
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
assert.Equal(t, uint64(1), s.Stats.CreateFail, "")
}
// Ensure that a successful Update is recorded in the stats.
func TestStoreStatsUpdateSuccess(t *testing.T) {
s := newStore()
s.Create("/foo", false, "bar", false, Permanent)
s.Update("/foo", "baz", Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
s.Update("/foo", "baz", TTLOptionSet{ExpireTime: Permanent})
assert.Equal(t, uint64(1), s.Stats.UpdateSuccess, "")
}
// Ensure that a failed Update is recorded in the stats.
func TestStoreStatsUpdateFail(t *testing.T) {
s := newStore()
s.Update("/foo", "bar", Permanent)
s.Update("/foo", "bar", TTLOptionSet{ExpireTime: Permanent})
assert.Equal(t, uint64(1), s.Stats.UpdateFail, "")
}
// Ensure that a successful CAS is recorded in the stats.
func TestStoreStatsCompareAndSwapSuccess(t *testing.T) {
s := newStore()
s.Create("/foo", false, "bar", false, Permanent)
s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
s.CompareAndSwap("/foo", "bar", 0, "baz", TTLOptionSet{ExpireTime: Permanent})
assert.Equal(t, uint64(1), s.Stats.CompareAndSwapSuccess, "")
}
// Ensure that a failed CAS is recorded in the stats.
func TestStoreStatsCompareAndSwapFail(t *testing.T) {
s := newStore()
s.Create("/foo", false, "bar", false, Permanent)
s.CompareAndSwap("/foo", "wrong_value", 0, "baz", Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
s.CompareAndSwap("/foo", "wrong_value", 0, "baz", TTLOptionSet{ExpireTime: Permanent})
assert.Equal(t, uint64(1), s.Stats.CompareAndSwapFail, "")
}
// Ensure that a successful Delete is recorded in the stats.
func TestStoreStatsDeleteSuccess(t *testing.T) {
s := newStore()
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
s.Delete("/foo", false, false)
assert.Equal(t, uint64(1), s.Stats.DeleteSuccess, "")
}
@ -104,7 +104,7 @@ func TestStoreStatsExpireCount(t *testing.T) {
fc := newFakeClock()
s.clock = fc
s.Create("/foo", false, "bar", false, fc.Now().Add(500*time.Millisecond))
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)})
assert.Equal(t, uint64(0), s.Stats.ExpireCount, "")
fc.Advance(600 * time.Millisecond)
s.DeleteExpiredKeys(fc.Now())

View File

@ -43,12 +43,12 @@ type Store interface {
Index() uint64
Get(nodePath string, recursive, sorted bool) (*Event, error)
Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error)
Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error)
Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error)
Create(nodePath string, dir bool, value string, unique bool,
expireTime time.Time) (*Event, error)
expireOpts TTLOptionSet) (*Event, error)
CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time) (*Event, error)
value string, expireOpts TTLOptionSet) (*Event, error)
Delete(nodePath string, dir, recursive bool) (*Event, error)
CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
@ -64,6 +64,11 @@ type Store interface {
DeleteExpiredKeys(cutoff time.Time)
}
type TTLOptionSet struct {
ExpireTime time.Time
Refresh bool
}
type store struct {
Root *node
WatcherHub *watcherHub
@ -154,7 +159,7 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
// Create creates the node at nodePath. Create will help to create intermediate directories with no ttl.
// If the node has already existed, create will fail.
// If any node on the path is a file, create will fail.
func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) {
func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireOpts TTLOptionSet) (*Event, error) {
var err *etcdErr.Error
s.worldLock.Lock()
@ -171,7 +176,7 @@ func (s *store) Create(nodePath string, dir bool, value string, unique bool, exp
reportWriteFailure(Create)
}()
e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
e, err := s.internalCreate(nodePath, dir, value, unique, false, expireOpts.ExpireTime, Create)
if err != nil {
return nil, err
}
@ -183,7 +188,7 @@ func (s *store) Create(nodePath string, dir bool, value string, unique bool, exp
}
// Set creates or replace the node at nodePath.
func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
func (s *store) Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error) {
var err *etcdErr.Error
s.worldLock.Lock()
@ -207,8 +212,17 @@ func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Tim
return nil, err
}
if expireOpts.Refresh {
if getErr != nil {
err = getErr
return nil, err
} else {
value = n.Value
}
}
// Set new value
e, err := s.internalCreate(nodePath, dir, value, false, true, expireTime, Set)
e, err := s.internalCreate(nodePath, dir, value, false, true, expireOpts.ExpireTime, Set)
if err != nil {
return nil, err
}
@ -221,7 +235,9 @@ func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Tim
e.PrevNode = prev.Node
}
s.WatcherHub.notify(e)
if !expireOpts.Refresh {
s.WatcherHub.notify(e)
}
return e, nil
}
@ -239,7 +255,7 @@ func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64)
}
func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time) (*Event, error) {
value string, expireOpts TTLOptionSet) (*Event, error) {
var err *etcdErr.Error
@ -290,14 +306,16 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
// if test succeed, write the value
n.Write(value, s.CurrentIndex)
n.UpdateTTL(expireTime)
n.UpdateTTL(expireOpts.ExpireTime)
// copy the value for safety
valueCopy := value
eNode.Value = &valueCopy
eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
s.WatcherHub.notify(e)
if !expireOpts.Refresh {
s.WatcherHub.notify(e)
}
return e, nil
}
@ -462,7 +480,7 @@ func (s *store) walk(nodePath string, walkFunc func(prev *node, component string
// Update updates the value/ttl of the node.
// If the node is a file, the value and the ttl can be updated.
// If the node is a directory, only the ttl can be updated.
func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error) {
var err *etcdErr.Error
s.worldLock.Lock()
@ -496,6 +514,10 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
}
if expireOpts.Refresh {
newValue = n.Value
}
e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
e.EtcdIndex = nextIndex
e.PrevNode = n.Repr(false, false, s.clock)
@ -512,11 +534,13 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
}
// update ttl
n.UpdateTTL(expireTime)
n.UpdateTTL(expireOpts.ExpireTime)
eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
s.WatcherHub.notify(e)
if !expireOpts.Refresh {
s.WatcherHub.notify(e)
}
s.CurrentIndex = nextIndex
@ -778,31 +802,31 @@ func (s *storeRecorder) Get(path string, recursive, sorted bool) (*Event, error)
})
return &Event{}, nil
}
func (s *storeRecorder) Set(path string, dir bool, val string, expr time.Time) (*Event, error) {
func (s *storeRecorder) Set(path string, dir bool, val string, expireOpts TTLOptionSet) (*Event, error) {
s.Record(testutil.Action{
Name: "Set",
Params: []interface{}{path, dir, val, expr},
Params: []interface{}{path, dir, val, expireOpts},
})
return &Event{}, nil
}
func (s *storeRecorder) Update(path, val string, expr time.Time) (*Event, error) {
func (s *storeRecorder) Update(path, val string, expireOpts TTLOptionSet) (*Event, error) {
s.Record(testutil.Action{
Name: "Update",
Params: []interface{}{path, val, expr},
Params: []interface{}{path, val, expireOpts},
})
return &Event{}, nil
}
func (s *storeRecorder) Create(path string, dir bool, val string, uniq bool, exp time.Time) (*Event, error) {
func (s *storeRecorder) Create(path string, dir bool, val string, uniq bool, expireOpts TTLOptionSet) (*Event, error) {
s.Record(testutil.Action{
Name: "Create",
Params: []interface{}{path, dir, val, uniq, exp},
Params: []interface{}{path, dir, val, uniq, expireOpts},
})
return &Event{}, nil
}
func (s *storeRecorder) CompareAndSwap(path, prevVal string, prevIdx uint64, val string, expr time.Time) (*Event, error) {
func (s *storeRecorder) CompareAndSwap(path, prevVal string, prevIdx uint64, val string, expireOpts TTLOptionSet) (*Event, error) {
s.Record(testutil.Action{
Name: "CompareAndSwap",
Params: []interface{}{path, prevVal, prevIdx, val, expr},
Params: []interface{}{path, prevVal, prevIdx, val, expireOpts},
})
return &Event{}, nil
}

View File

@ -56,7 +56,7 @@ func BenchmarkStoreDelete(b *testing.B) {
runtime.ReadMemStats(memStats)
for i := 0; i < b.N; i++ {
_, err := s.Set(kvs[i][0], false, kvs[i][1], Permanent)
_, err := s.Set(kvs[i][0], false, kvs[i][1], TTLOptionSet{ExpireTime: Permanent})
if err != nil {
panic(err)
}
@ -132,7 +132,7 @@ func BenchmarkWatchWithSet(b *testing.B) {
for i := 0; i < b.N; i++ {
w, _ := s.Watch(kvs[i][0], false, false, 0)
s.Set(kvs[i][0], false, "test", Permanent)
s.Set(kvs[i][0], false, "test", TTLOptionSet{ExpireTime: Permanent})
<-w.EventChan()
}
}
@ -150,7 +150,7 @@ func BenchmarkWatchWithSetBatch(b *testing.B) {
}
for i := 0; i < b.N; i++ {
s.Set(kvs[i][0], false, "test", Permanent)
s.Set(kvs[i][0], false, "test", TTLOptionSet{ExpireTime: Permanent})
}
for i := 0; i < b.N; i++ {
@ -167,7 +167,7 @@ func BenchmarkWatchOneKey(b *testing.B) {
watchers[i], _ = s.Watch("/foo", false, false, 0)
}
s.Set("/foo", false, "", Permanent)
s.Set("/foo", false, "", TTLOptionSet{ExpireTime: Permanent})
for i := 0; i < b.N; i++ {
<-watchers[i].EventChan()
@ -181,7 +181,7 @@ func benchStoreSet(b *testing.B, valueSize int, process func(interface{}) ([]byt
b.StartTimer()
for i := 0; i < b.N; i++ {
resp, err := s.Set(kvs[i][0], false, kvs[i][1], Permanent)
resp, err := s.Set(kvs[i][0], false, kvs[i][1], TTLOptionSet{ExpireTime: Permanent})
if err != nil {
panic(err)
}

View File

@ -35,7 +35,7 @@ func TestNewStoreWithNamespaces(t *testing.T) {
// Ensure that the store can retrieve an existing value.
func TestStoreGetValue(t *testing.T) {
s := newStore()
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
var eidx uint64 = 1
e, err := s.Get("/foo", false, false)
assert.Nil(t, err, "")
@ -52,7 +52,7 @@ func TestMinExpireTime(t *testing.T) {
s.clock = fc
// FakeClock starts at 0, so minExpireTime should be far in the future.. but just in case
assert.True(t, minExpireTime.After(fc.Now()), "minExpireTime should be ahead of FakeClock!")
s.Create("/foo", false, "Y", false, fc.Now().Add(3*time.Second))
s.Create("/foo", false, "Y", false, TTLOptionSet{ExpireTime: fc.Now().Add(3 * time.Second)})
fc.Advance(5 * time.Second)
// Ensure it hasn't expired
s.DeleteExpiredKeys(fc.Now())
@ -71,13 +71,13 @@ func TestStoreGetDirectory(t *testing.T) {
s := newStore()
fc := newFakeClock()
s.clock = fc
s.Create("/foo", true, "", false, Permanent)
s.Create("/foo/bar", false, "X", false, Permanent)
s.Create("/foo/_hidden", false, "*", false, Permanent)
s.Create("/foo/baz", true, "", false, Permanent)
s.Create("/foo/baz/bat", false, "Y", false, Permanent)
s.Create("/foo/baz/_hidden", false, "*", false, Permanent)
s.Create("/foo/baz/ttl", false, "Y", false, fc.Now().Add(time.Second*3))
s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/bar", false, "X", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/_hidden", false, "*", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/baz", true, "", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/baz/bat", false, "Y", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/baz/_hidden", false, "*", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/baz/ttl", false, "Y", false, TTLOptionSet{ExpireTime: fc.Now().Add(time.Second * 3)})
var eidx uint64 = 7
e, err := s.Get("/foo", true, false)
assert.Nil(t, err, "")
@ -117,12 +117,12 @@ func TestStoreGetDirectory(t *testing.T) {
// Ensure that the store can retrieve a directory in sorted order.
func TestStoreGetSorted(t *testing.T) {
s := newStore()
s.Create("/foo", true, "", false, Permanent)
s.Create("/foo/x", false, "0", false, Permanent)
s.Create("/foo/z", false, "0", false, Permanent)
s.Create("/foo/y", true, "", false, Permanent)
s.Create("/foo/y/a", false, "0", false, Permanent)
s.Create("/foo/y/b", false, "0", false, Permanent)
s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/x", false, "0", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/z", false, "0", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/y", true, "", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/y/a", false, "0", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/y/b", false, "0", false, TTLOptionSet{ExpireTime: Permanent})
var eidx uint64 = 6
e, err := s.Get("/foo", true, true)
assert.Nil(t, err, "")
@ -153,7 +153,7 @@ func TestSet(t *testing.T) {
// Set /foo=""
var eidx uint64 = 1
e, err := s.Set("/foo", false, "", Permanent)
e, err := s.Set("/foo", false, "", TTLOptionSet{ExpireTime: Permanent})
assert.Nil(t, err, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "set", "")
@ -167,7 +167,7 @@ func TestSet(t *testing.T) {
// Set /foo="bar"
eidx = 2
e, err = s.Set("/foo", false, "bar", Permanent)
e, err = s.Set("/foo", false, "bar", TTLOptionSet{ExpireTime: Permanent})
assert.Nil(t, err, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "set", "")
@ -185,7 +185,7 @@ func TestSet(t *testing.T) {
assert.Equal(t, e.PrevNode.ModifiedIndex, uint64(1), "")
// Set /foo="baz" (for testing prevNode)
eidx = 3
e, err = s.Set("/foo", false, "baz", Permanent)
e, err = s.Set("/foo", false, "baz", TTLOptionSet{ExpireTime: Permanent})
assert.Nil(t, err, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "set", "")
@ -204,7 +204,7 @@ func TestSet(t *testing.T) {
// Set /dir as a directory
eidx = 4
e, err = s.Set("/dir", true, "", Permanent)
e, err = s.Set("/dir", true, "", TTLOptionSet{ExpireTime: Permanent})
assert.Nil(t, err, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "set", "")
@ -222,7 +222,7 @@ func TestStoreCreateValue(t *testing.T) {
s := newStore()
// Create /foo=bar
var eidx uint64 = 1
e, err := s.Create("/foo", false, "bar", false, Permanent)
e, err := s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
assert.Nil(t, err, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "create", "")
@ -236,7 +236,7 @@ func TestStoreCreateValue(t *testing.T) {
// Create /empty=""
eidx = 2
e, err = s.Create("/empty", false, "", false, Permanent)
e, err = s.Create("/empty", false, "", false, TTLOptionSet{ExpireTime: Permanent})
assert.Nil(t, err, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "create", "")
@ -254,7 +254,7 @@ func TestStoreCreateValue(t *testing.T) {
func TestStoreCreateDirectory(t *testing.T) {
s := newStore()
var eidx uint64 = 1
e, err := s.Create("/foo", true, "", false, Permanent)
e, err := s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent})
assert.Nil(t, err, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "create", "")
@ -266,10 +266,10 @@ func TestStoreCreateDirectory(t *testing.T) {
func TestStoreCreateFailsIfExists(t *testing.T) {
s := newStore()
// create /foo as dir
s.Create("/foo", true, "", false, Permanent)
s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent})
// create /foo as dir again
e, _err := s.Create("/foo", true, "", false, Permanent)
e, _err := s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent})
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeNodeExist, "")
assert.Equal(t, err.Message, "Key already exists", "")
@ -282,10 +282,10 @@ func TestStoreCreateFailsIfExists(t *testing.T) {
func TestStoreUpdateValue(t *testing.T) {
s := newStore()
// create /foo=bar
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
// update /foo="bzr"
var eidx uint64 = 2
e, err := s.Update("/foo", "baz", Permanent)
e, err := s.Update("/foo", "baz", TTLOptionSet{ExpireTime: Permanent})
assert.Nil(t, err, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "update", "")
@ -306,7 +306,7 @@ func TestStoreUpdateValue(t *testing.T) {
// update /foo=""
eidx = 3
e, err = s.Update("/foo", "", Permanent)
e, err = s.Update("/foo", "", TTLOptionSet{ExpireTime: Permanent})
assert.Nil(t, err, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "update", "")
@ -329,8 +329,8 @@ func TestStoreUpdateValue(t *testing.T) {
// Ensure that the store cannot update a directory.
func TestStoreUpdateFailsIfDirectory(t *testing.T) {
s := newStore()
s.Create("/foo", true, "", false, Permanent)
e, _err := s.Update("/foo", "baz", Permanent)
s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent})
e, _err := s.Update("/foo", "baz", TTLOptionSet{ExpireTime: Permanent})
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "")
assert.Equal(t, err.Message, "Not a file", "")
@ -345,8 +345,8 @@ func TestStoreUpdateValueTTL(t *testing.T) {
s.clock = fc
var eidx uint64 = 2
s.Create("/foo", false, "bar", false, Permanent)
_, err := s.Update("/foo", "baz", fc.Now().Add(500*time.Millisecond))
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
_, err := s.Update("/foo", "baz", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)})
e, _ := s.Get("/foo", false, false)
assert.Equal(t, *e.Node.Value, "baz", "")
assert.Equal(t, e.EtcdIndex, eidx, "")
@ -364,9 +364,9 @@ func TestStoreUpdateDirTTL(t *testing.T) {
s.clock = fc
var eidx uint64 = 3
s.Create("/foo", true, "", false, Permanent)
s.Create("/foo/bar", false, "baz", false, Permanent)
e, err := s.Update("/foo", "", fc.Now().Add(500*time.Millisecond))
s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent})
e, err := s.Update("/foo", "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)})
assert.Equal(t, e.Node.Dir, true, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
e, _ = s.Get("/foo/bar", false, false)
@ -384,7 +384,7 @@ func TestStoreUpdateDirTTL(t *testing.T) {
func TestStoreDeleteValue(t *testing.T) {
s := newStore()
var eidx uint64 = 2
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
e, err := s.Delete("/foo", false, false)
assert.Nil(t, err, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
@ -400,7 +400,7 @@ func TestStoreDeleteDiretory(t *testing.T) {
s := newStore()
// create directory /foo
var eidx uint64 = 2
s.Create("/foo", true, "", false, Permanent)
s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent})
// delete /foo with dir = true and recursive = false
// this should succeed, since the directory is empty
e, err := s.Delete("/foo", true, false)
@ -413,7 +413,7 @@ func TestStoreDeleteDiretory(t *testing.T) {
assert.Equal(t, e.PrevNode.Dir, true, "")
// create directory /foo and directory /foo/bar
s.Create("/foo/bar", true, "", false, Permanent)
s.Create("/foo/bar", true, "", false, TTLOptionSet{ExpireTime: Permanent})
// delete /foo with dir = true and recursive = false
// this should fail, since the directory is not empty
_, err = s.Delete("/foo", true, false)
@ -433,7 +433,7 @@ func TestStoreDeleteDiretory(t *testing.T) {
// and dir are not specified.
func TestStoreDeleteDiretoryFailsIfNonRecursiveAndDir(t *testing.T) {
s := newStore()
s.Create("/foo", true, "", false, Permanent)
s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent})
e, _err := s.Delete("/foo", false, false)
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "")
@ -445,19 +445,19 @@ func TestRootRdOnly(t *testing.T) {
s := newStore("/0")
for _, tt := range []string{"/", "/0"} {
_, err := s.Set(tt, true, "", Permanent)
_, err := s.Set(tt, true, "", TTLOptionSet{ExpireTime: Permanent})
assert.NotNil(t, err, "")
_, err = s.Delete(tt, true, true)
assert.NotNil(t, err, "")
_, err = s.Create(tt, true, "", false, Permanent)
_, err = s.Create(tt, true, "", false, TTLOptionSet{ExpireTime: Permanent})
assert.NotNil(t, err, "")
_, err = s.Update(tt, "", Permanent)
_, err = s.Update(tt, "", TTLOptionSet{ExpireTime: Permanent})
assert.NotNil(t, err, "")
_, err = s.CompareAndSwap(tt, "", 0, "", Permanent)
_, err = s.CompareAndSwap(tt, "", 0, "", TTLOptionSet{ExpireTime: Permanent})
assert.NotNil(t, err, "")
}
}
@ -465,7 +465,7 @@ func TestRootRdOnly(t *testing.T) {
func TestStoreCompareAndDeletePrevValue(t *testing.T) {
s := newStore()
var eidx uint64 = 2
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
e, err := s.CompareAndDelete("/foo", "bar", 0)
assert.Nil(t, err, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
@ -483,7 +483,7 @@ func TestStoreCompareAndDeletePrevValue(t *testing.T) {
func TestStoreCompareAndDeletePrevValueFailsIfNotMatch(t *testing.T) {
s := newStore()
var eidx uint64 = 1
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
e, _err := s.CompareAndDelete("/foo", "baz", 0)
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
@ -497,7 +497,7 @@ func TestStoreCompareAndDeletePrevValueFailsIfNotMatch(t *testing.T) {
func TestStoreCompareAndDeletePrevIndex(t *testing.T) {
s := newStore()
var eidx uint64 = 2
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
e, err := s.CompareAndDelete("/foo", "", 1)
assert.Nil(t, err, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
@ -513,7 +513,7 @@ func TestStoreCompareAndDeletePrevIndex(t *testing.T) {
func TestStoreCompareAndDeletePrevIndexFailsIfNotMatch(t *testing.T) {
s := newStore()
var eidx uint64 = 1
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
e, _err := s.CompareAndDelete("/foo", "", 100)
assert.NotNil(t, _err, "")
err := _err.(*etcdErr.Error)
@ -528,7 +528,7 @@ func TestStoreCompareAndDeletePrevIndexFailsIfNotMatch(t *testing.T) {
// Ensure that the store cannot delete a directory.
func TestStoreCompareAndDeleteDiretoryFail(t *testing.T) {
s := newStore()
s.Create("/foo", true, "", false, Permanent)
s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent})
_, _err := s.CompareAndDelete("/foo", "", 0)
assert.NotNil(t, _err, "")
err := _err.(*etcdErr.Error)
@ -539,8 +539,8 @@ func TestStoreCompareAndDeleteDiretoryFail(t *testing.T) {
func TestStoreCompareAndSwapPrevValue(t *testing.T) {
s := newStore()
var eidx uint64 = 2
s.Create("/foo", false, "bar", false, Permanent)
e, err := s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
e, err := s.CompareAndSwap("/foo", "bar", 0, "baz", TTLOptionSet{ExpireTime: Permanent})
assert.Nil(t, err, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "compareAndSwap", "")
@ -560,8 +560,8 @@ func TestStoreCompareAndSwapPrevValue(t *testing.T) {
func TestStoreCompareAndSwapPrevValueFailsIfNotMatch(t *testing.T) {
s := newStore()
var eidx uint64 = 1
s.Create("/foo", false, "bar", false, Permanent)
e, _err := s.CompareAndSwap("/foo", "wrong_value", 0, "baz", Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
e, _err := s.CompareAndSwap("/foo", "wrong_value", 0, "baz", TTLOptionSet{ExpireTime: Permanent})
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
assert.Equal(t, err.Message, "Compare failed", "")
@ -575,8 +575,8 @@ func TestStoreCompareAndSwapPrevValueFailsIfNotMatch(t *testing.T) {
func TestStoreCompareAndSwapPrevIndex(t *testing.T) {
s := newStore()
var eidx uint64 = 2
s.Create("/foo", false, "bar", false, Permanent)
e, err := s.CompareAndSwap("/foo", "", 1, "baz", Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
e, err := s.CompareAndSwap("/foo", "", 1, "baz", TTLOptionSet{ExpireTime: Permanent})
assert.Nil(t, err, "")
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "compareAndSwap", "")
@ -597,8 +597,8 @@ func TestStoreCompareAndSwapPrevIndex(t *testing.T) {
func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
s := newStore()
var eidx uint64 = 1
s.Create("/foo", false, "bar", false, Permanent)
e, _err := s.CompareAndSwap("/foo", "", 100, "baz", Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
e, _err := s.CompareAndSwap("/foo", "", 100, "baz", TTLOptionSet{ExpireTime: Permanent})
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
assert.Equal(t, err.Message, "Compare failed", "")
@ -615,7 +615,7 @@ func TestStoreWatchCreate(t *testing.T) {
w, _ := s.Watch("/foo", false, false, 0)
c := w.EventChan()
assert.Equal(t, w.StartIndex(), eidx, "")
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
eidx = 1
e := nbselect(c)
assert.Equal(t, e.EtcdIndex, eidx, "")
@ -632,7 +632,7 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
w, _ := s.Watch("/foo", true, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
eidx = 1
s.Create("/foo/bar", false, "baz", false, Permanent)
s.Create("/foo/bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent})
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "create", "")
@ -643,11 +643,11 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
func TestStoreWatchUpdate(t *testing.T) {
s := newStore()
var eidx uint64 = 1
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
w, _ := s.Watch("/foo", false, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
eidx = 2
s.Update("/foo", "baz", Permanent)
s.Update("/foo", "baz", TTLOptionSet{ExpireTime: Permanent})
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "update", "")
@ -658,11 +658,11 @@ func TestStoreWatchUpdate(t *testing.T) {
func TestStoreWatchRecursiveUpdate(t *testing.T) {
s := newStore()
var eidx uint64 = 1
s.Create("/foo/bar", false, "baz", false, Permanent)
s.Create("/foo/bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent})
w, _ := s.Watch("/foo", true, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
eidx = 2
s.Update("/foo/bar", "baz", Permanent)
s.Update("/foo/bar", "baz", TTLOptionSet{ExpireTime: Permanent})
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "update", "")
@ -673,7 +673,7 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
func TestStoreWatchDelete(t *testing.T) {
s := newStore()
var eidx uint64 = 1
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
w, _ := s.Watch("/foo", false, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
eidx = 2
@ -688,7 +688,7 @@ func TestStoreWatchDelete(t *testing.T) {
func TestStoreWatchRecursiveDelete(t *testing.T) {
s := newStore()
var eidx uint64 = 1
s.Create("/foo/bar", false, "baz", false, Permanent)
s.Create("/foo/bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent})
w, _ := s.Watch("/foo", true, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
eidx = 2
@ -703,11 +703,11 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
func TestStoreWatchCompareAndSwap(t *testing.T) {
s := newStore()
var eidx uint64 = 1
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
w, _ := s.Watch("/foo", false, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
eidx = 2
s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
s.CompareAndSwap("/foo", "bar", 0, "baz", TTLOptionSet{ExpireTime: Permanent})
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "compareAndSwap", "")
@ -718,11 +718,11 @@ func TestStoreWatchCompareAndSwap(t *testing.T) {
func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
s := newStore()
var eidx uint64 = 1
s.Create("/foo/bar", false, "baz", false, Permanent)
s.Create("/foo/bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent})
w, _ := s.Watch("/foo", true, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
eidx = 2
s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
s.CompareAndSwap("/foo/bar", "baz", 0, "bat", TTLOptionSet{ExpireTime: Permanent})
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "compareAndSwap", "")
@ -736,8 +736,8 @@ func TestStoreWatchExpire(t *testing.T) {
s.clock = fc
var eidx uint64 = 2
s.Create("/foo", false, "bar", false, fc.Now().Add(500*time.Millisecond))
s.Create("/foofoo", false, "barbarbar", false, fc.Now().Add(500*time.Millisecond))
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)})
s.Create("/foofoo", false, "barbarbar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)})
w, _ := s.Watch("/", true, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
@ -760,13 +760,95 @@ func TestStoreWatchExpire(t *testing.T) {
assert.Equal(t, e.Node.Key, "/foofoo", "")
}
// Ensure that the store can watch for key expiration when refreshing.
func TestStoreWatchExpireRefresh(t *testing.T) {
s := newStore()
fc := newFakeClock()
s.clock = fc
var eidx uint64 = 2
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true})
s.Create("/foofoo", false, "barbarbar", false, TTLOptionSet{ExpireTime: fc.Now().Add(1200 * time.Millisecond), Refresh: true})
// Make sure we set watch updates when Refresh is true for newly created keys
w, _ := s.Watch("/", true, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
c := w.EventChan()
e := nbselect(c)
assert.Nil(t, e, "")
fc.Advance(600 * time.Millisecond)
s.DeleteExpiredKeys(fc.Now())
eidx = 3
e = nbselect(c)
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "expire", "")
assert.Equal(t, e.Node.Key, "/foo", "")
s.Update("/foofoo", "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true})
w, _ = s.Watch("/", true, false, 4)
fc.Advance(700 * time.Millisecond)
s.DeleteExpiredKeys(fc.Now())
eidx = 5 // We should skip 4 because a TTL update should occur with no watch notification
assert.Equal(t, w.StartIndex(), eidx-1, "")
e = nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "expire", "")
assert.Equal(t, e.Node.Key, "/foofoo", "")
}
// Ensure that the store can watch for key expiration when refreshing with an empty value.
func TestStoreWatchExpireEmptyRefresh(t *testing.T) {
s := newStore()
fc := newFakeClock()
s.clock = fc
var eidx uint64 = 1
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true})
// Should be no-op
fc.Advance(200 * time.Millisecond)
s.DeleteExpiredKeys(fc.Now())
s.Update("/foo", "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true})
w, _ := s.Watch("/", true, false, 2)
fc.Advance(700 * time.Millisecond)
s.DeleteExpiredKeys(fc.Now())
eidx = 3 // We should skip 2 because a TTL update should occur with no watch notification
assert.Equal(t, w.StartIndex(), eidx-1, "")
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "expire", "")
assert.Equal(t, e.Node.Key, "/foo", "")
assert.Equal(t, *e.PrevNode.Value, "bar", "")
}
// Ensure that the store can update the TTL on a value with refresh.
func TestStoreRefresh(t *testing.T) {
s := newStore()
fc := newFakeClock()
s.clock = fc
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)})
s.Create("/bar", true, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)})
_, err := s.Update("/foo", "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true})
assert.Nil(t, err, "")
_, err = s.Set("/foo", false, "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true})
assert.Nil(t, err, "")
_, err = s.Update("/bar", "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true})
assert.Nil(t, err, "")
_, err = s.CompareAndSwap("/foo", "bar", 0, "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true})
assert.Nil(t, err, "")
}
// Ensure that the store can watch in streaming mode.
func TestStoreWatchStream(t *testing.T) {
s := newStore()
var eidx uint64 = 1
w, _ := s.Watch("/foo", false, true, 0)
// first modification
s.Create("/foo", false, "bar", false, Permanent)
s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "create", "")
@ -776,7 +858,7 @@ func TestStoreWatchStream(t *testing.T) {
assert.Nil(t, e, "")
// second modification
eidx = 2
s.Update("/foo", "baz", Permanent)
s.Update("/foo", "baz", TTLOptionSet{ExpireTime: Permanent})
e = nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "update", "")
@ -790,10 +872,10 @@ func TestStoreWatchStream(t *testing.T) {
func TestStoreRecover(t *testing.T) {
s := newStore()
var eidx uint64 = 4
s.Create("/foo", true, "", false, Permanent)
s.Create("/foo/x", false, "bar", false, Permanent)
s.Update("/foo/x", "barbar", Permanent)
s.Create("/foo/y", false, "baz", false, Permanent)
s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/x", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
s.Update("/foo/x", "barbar", TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/y", false, "baz", false, TTLOptionSet{ExpireTime: Permanent})
b, err := s.Save()
s2 := newStore()
@ -820,9 +902,9 @@ func TestStoreRecoverWithExpiration(t *testing.T) {
fc := newFakeClock()
var eidx uint64 = 4
s.Create("/foo", true, "", false, Permanent)
s.Create("/foo/x", false, "bar", false, Permanent)
s.Create("/foo/y", false, "baz", false, fc.Now().Add(5*time.Millisecond))
s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/x", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
s.Create("/foo/y", false, "baz", false, TTLOptionSet{ExpireTime: fc.Now().Add(5 * time.Millisecond)})
b, err := s.Save()
time.Sleep(10 * time.Millisecond)
@ -850,7 +932,7 @@ func TestStoreWatchCreateWithHiddenKey(t *testing.T) {
s := newStore()
var eidx uint64 = 1
w, _ := s.Watch("/_foo", false, false, 0)
s.Create("/_foo", false, "bar", false, Permanent)
s.Create("/_foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "create", "")
@ -863,14 +945,14 @@ func TestStoreWatchCreateWithHiddenKey(t *testing.T) {
func TestStoreWatchRecursiveCreateWithHiddenKey(t *testing.T) {
s := newStore()
w, _ := s.Watch("/foo", true, false, 0)
s.Create("/foo/_bar", false, "baz", false, Permanent)
s.Create("/foo/_bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent})
e := nbselect(w.EventChan())
assert.Nil(t, e, "")
w, _ = s.Watch("/foo", true, false, 0)
s.Create("/foo/_baz", true, "", false, Permanent)
s.Create("/foo/_baz", true, "", false, TTLOptionSet{ExpireTime: Permanent})
e = nbselect(w.EventChan())
assert.Nil(t, e, "")
s.Create("/foo/_baz/quux", false, "quux", false, Permanent)
s.Create("/foo/_baz/quux", false, "quux", false, TTLOptionSet{ExpireTime: Permanent})
e = nbselect(w.EventChan())
assert.Nil(t, e, "")
}
@ -878,9 +960,9 @@ func TestStoreWatchRecursiveCreateWithHiddenKey(t *testing.T) {
// Ensure that the store doesn't see hidden key updates.
func TestStoreWatchUpdateWithHiddenKey(t *testing.T) {
s := newStore()
s.Create("/_foo", false, "bar", false, Permanent)
s.Create("/_foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
w, _ := s.Watch("/_foo", false, false, 0)
s.Update("/_foo", "baz", Permanent)
s.Update("/_foo", "baz", TTLOptionSet{ExpireTime: Permanent})
e := nbselect(w.EventChan())
assert.Equal(t, e.Action, "update", "")
assert.Equal(t, e.Node.Key, "/_foo", "")
@ -891,9 +973,9 @@ func TestStoreWatchUpdateWithHiddenKey(t *testing.T) {
// Ensure that the store doesn't see hidden key updates without an exact path match in recursive mode.
func TestStoreWatchRecursiveUpdateWithHiddenKey(t *testing.T) {
s := newStore()
s.Create("/foo/_bar", false, "baz", false, Permanent)
s.Create("/foo/_bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent})
w, _ := s.Watch("/foo", true, false, 0)
s.Update("/foo/_bar", "baz", Permanent)
s.Update("/foo/_bar", "baz", TTLOptionSet{ExpireTime: Permanent})
e := nbselect(w.EventChan())
assert.Nil(t, e, "")
}
@ -902,7 +984,7 @@ func TestStoreWatchRecursiveUpdateWithHiddenKey(t *testing.T) {
func TestStoreWatchDeleteWithHiddenKey(t *testing.T) {
s := newStore()
var eidx uint64 = 2
s.Create("/_foo", false, "bar", false, Permanent)
s.Create("/_foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent})
w, _ := s.Watch("/_foo", false, false, 0)
s.Delete("/_foo", false, false)
e := nbselect(w.EventChan())
@ -916,7 +998,7 @@ func TestStoreWatchDeleteWithHiddenKey(t *testing.T) {
// Ensure that the store doesn't see hidden key deletes without an exact path match in recursive mode.
func TestStoreWatchRecursiveDeleteWithHiddenKey(t *testing.T) {
s := newStore()
s.Create("/foo/_bar", false, "baz", false, Permanent)
s.Create("/foo/_bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent})
w, _ := s.Watch("/foo", true, false, 0)
s.Delete("/foo/_bar", false, false)
e := nbselect(w.EventChan())
@ -929,8 +1011,8 @@ func TestStoreWatchExpireWithHiddenKey(t *testing.T) {
fc := newFakeClock()
s.clock = fc
s.Create("/_foo", false, "bar", false, fc.Now().Add(500*time.Millisecond))
s.Create("/foofoo", false, "barbarbar", false, fc.Now().Add(1000*time.Millisecond))
s.Create("/_foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)})
s.Create("/foofoo", false, "barbarbar", false, TTLOptionSet{ExpireTime: fc.Now().Add(1000 * time.Millisecond)})
w, _ := s.Watch("/", true, false, 0)
c := w.EventChan()
@ -952,7 +1034,7 @@ func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) {
s := newStore()
var eidx uint64 = 1
w, _ := s.Watch("/_foo/bar", true, false, 0)
s.Create("/_foo/bar/baz", false, "baz", false, Permanent)
s.Create("/_foo/bar/baz", false, "baz", false, TTLOptionSet{ExpireTime: Permanent})
e := nbselect(w.EventChan())
assert.NotNil(t, e, "")
@ -970,10 +1052,10 @@ func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) {
// to operate correctly.
func TestStoreWatchSlowConsumer(t *testing.T) {
s := newStore()
s.Watch("/foo", true, true, 0) // stream must be true
s.Set("/foo", false, "1", Permanent) // ok
s.Set("/foo", false, "2", Permanent) // ok
s.Set("/foo", false, "3", Permanent) // must not panic
s.Watch("/foo", true, true, 0) // stream must be true
s.Set("/foo", false, "1", TTLOptionSet{ExpireTime: Permanent}) // ok
s.Set("/foo", false, "2", TTLOptionSet{ExpireTime: Permanent}) // ok
s.Set("/foo", false, "3", TTLOptionSet{ExpireTime: Permanent}) // must not panic
}
// Performs a non-blocking select on an event channel.