Merge branch '0.2' of https://github.com/coreos/etcd into migration-test

This commit is contained in:
Ben Johnson 2013-11-12 17:29:58 -05:00
commit ccc27a61f5
40 changed files with 728 additions and 506 deletions

View File

@ -80,16 +80,14 @@ type Error struct {
Message string `json:"message"` Message string `json:"message"`
Cause string `json:"cause,omitempty"` Cause string `json:"cause,omitempty"`
Index uint64 `json:"index"` Index uint64 `json:"index"`
Term uint64 `json:"term"`
} }
func NewError(errorCode int, cause string, index uint64, term uint64) *Error { func NewError(errorCode int, cause string, index uint64) *Error {
return &Error{ return &Error{
ErrorCode: errorCode, ErrorCode: errorCode,
Message: errors[errorCode], Message: errors[errorCode],
Cause: cause, Cause: cause,
Index: index, Index: index,
Term: term,
} }
} }
@ -109,7 +107,6 @@ func (e Error) toJsonString() string {
func (e Error) Write(w http.ResponseWriter) { func (e Error) Write(w http.ResponseWriter) {
w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index)) w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index))
w.Header().Add("X-Etcd-Term", fmt.Sprint(e.Term))
// 3xx is reft internal error // 3xx is reft internal error
if e.ErrorCode/100 == 3 { if e.ErrorCode/100 == 3 {
http.Error(w, e.toJsonString(), http.StatusInternalServerError) http.Error(w, e.toJsonString(), http.StatusInternalServerError)

View File

@ -54,11 +54,11 @@ func (c *JoinCommand) Apply(server raft.Server) (interface{}, error) {
// Check machine number in the cluster // Check machine number in the cluster
if ps.registry.Count() == ps.MaxClusterSize { if ps.registry.Count() == ps.MaxClusterSize {
log.Debug("Reject join request from ", c.Name) log.Debug("Reject join request from ", c.Name)
return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex(), server.Term()) return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex())
} }
// Add to shared machine registry. // Add to shared machine registry.
ps.registry.Register(c.Name, c.RaftURL, c.EtcdURL, server.CommitIndex(), server.Term()) ps.registry.Register(c.Name, c.RaftURL, c.EtcdURL)
// Add peer in raft // Add peer in raft
err := server.AddPeer(c.Name, "") err := server.AddPeer(c.Name, "")

View File

@ -136,6 +136,8 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
log.Debugf("%s restart as a follower", s.name) log.Debugf("%s restart as a follower", s.name)
} }
go s.monitorSync()
// open the snapshot // open the snapshot
if snapshot { if snapshot {
go s.monitorSnapshot() go s.monitorSnapshot()
@ -424,3 +426,15 @@ func (s *PeerServer) monitorSnapshot() {
} }
} }
} }
func (s *PeerServer) monitorSync() {
ticker := time.Tick(time.Millisecond * 500)
for {
select {
case now := <-ticker:
if s.raftServer.State() == raft.Leader {
s.raftServer.Do(s.store.CommandFactory().CreateSyncCommand(now))
}
}
}
}

View File

@ -38,20 +38,20 @@ func NewRegistry(s store.Store) *Registry {
} }
// Adds a node to the registry. // Adds a node to the registry.
func (r *Registry) Register(name string, peerURL string, url string, commitIndex uint64, term uint64) error { func (r *Registry) Register(name string, peerURL string, url string) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
// Write data to store. // Write data to store.
key := path.Join(RegistryKey, name) key := path.Join(RegistryKey, name)
value := fmt.Sprintf("raft=%s&etcd=%s", peerURL, url) value := fmt.Sprintf("raft=%s&etcd=%s", peerURL, url)
_, err := r.store.Create(key, value, false, store.Permanent, commitIndex, term) _, err := r.store.Create(key, value, false, store.Permanent)
log.Debugf("Register: %s", name) log.Debugf("Register: %s", name)
return err return err
} }
// Removes a node from the registry. // Removes a node from the registry.
func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) error { func (r *Registry) Unregister(name string) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
@ -59,14 +59,14 @@ func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) erro
// delete(r.nodes, name) // delete(r.nodes, name)
// Remove the key from the store. // Remove the key from the store.
_, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term) _, err := r.store.Delete(path.Join(RegistryKey, name), false)
log.Debugf("Unregister: %s", name) log.Debugf("Unregister: %s", name)
return err return err
} }
// Returns the number of nodes in the cluster. // Returns the number of nodes in the cluster.
func (r *Registry) Count() int { func (r *Registry) Count() int {
e, err := r.store.Get(RegistryKey, false, false, 0, 0) e, err := r.store.Get(RegistryKey, false, false)
if err != nil { if err != nil {
return 0 return 0
} }
@ -133,7 +133,7 @@ func (r *Registry) urls(leaderName, selfName string, url func(name string) (stri
} }
// Retrieve a list of all nodes. // Retrieve a list of all nodes.
if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil { if e, _ := r.store.Get(RegistryKey, false, false); e != nil {
// Lookup the URL for each one. // Lookup the URL for each one.
for _, pair := range e.KVPairs { for _, pair := range e.KVPairs {
_, name := filepath.Split(pair.Key) _, name := filepath.Split(pair.Key)
@ -160,7 +160,7 @@ func (r *Registry) load(name string) {
} }
// Retrieve from store. // Retrieve from store.
e, err := r.store.Get(path.Join(RegistryKey, name), false, false, 0, 0) e, err := r.store.Get(path.Join(RegistryKey, name), false, false)
if err != nil { if err != nil {
return return
} }

View File

@ -27,7 +27,7 @@ func (c *RemoveCommand) Apply(server raft.Server) (interface{}, error) {
ps, _ := server.Context().(*PeerServer) ps, _ := server.Context().(*PeerServer)
// Remove node from the shared registry. // Remove node from the shared registry.
err := ps.registry.Unregister(c.Name, server.CommitIndex(), server.Term()) err := ps.registry.Unregister(c.Name)
// Delete from stats // Delete from stats
delete(ps.followersStats.Followers, c.Name) delete(ps.followersStats.Followers, c.Name)

View File

@ -232,6 +232,7 @@ func (s *Server) Close() {
} }
} }
// Dispatch command to the current leader
func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error { func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
ps := s.peerServer ps := s.peerServer
if ps.raftServer.State() == raft.Leader { if ps.raftServer.State() == raft.Leader {
@ -241,7 +242,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
} }
if result == nil { if result == nil {
return etcdErr.NewError(300, "Empty result from raft", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(300, "Empty result from raft", s.Store().Index())
} }
// response for raft related commands[join/remove] // response for raft related commands[join/remove]
@ -259,6 +260,12 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
e, _ := result.(*store.Event) e, _ := result.(*store.Event)
b, _ = json.Marshal(e) b, _ = json.Marshal(e)
// etcd index should be the same as the event index
// which is also the last modified index of the node
w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index))
w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
if e.IsCreated() { if e.IsCreated() {
w.WriteHeader(http.StatusCreated) w.WriteHeader(http.StatusCreated)
} else { } else {
@ -275,7 +282,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
// No leader available. // No leader available.
if leader == "" { if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(300, "", s.Store().Index())
} }
var url string var url string
@ -324,7 +331,7 @@ func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) err
func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error { func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error {
leader := s.peerServer.RaftServer().Leader() leader := s.peerServer.RaftServer().Leader()
if leader == "" { if leader == "" {
return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", s.Store().Index())
} }
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
url, _ := s.registry.PeerURL(leader) url, _ := s.registry.PeerURL(leader)
@ -355,7 +362,7 @@ func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request)
leader := s.peerServer.RaftServer().Leader() leader := s.peerServer.RaftServer().Leader()
if leader == "" { if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(300, "", s.Store().Index())
} }
hostname, _ := s.registry.ClientURL(leader) hostname, _ := s.registry.ClientURL(leader)
redirect(hostname, w, req) redirect(hostname, w, req)

View File

@ -13,7 +13,7 @@ func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
key := vars["key"] key := vars["key"]
// Retrieve the key from the store. // Retrieve the key from the store.
event, err := s.Store().Get(key, false, false, s.CommitIndex(), s.Term()) event, err := s.Store().Get(key, false, false)
if err != nil { if err != nil {
return err return err
} }

View File

@ -19,13 +19,13 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
// Parse non-blank value. // Parse non-blank value.
value := req.Form.Get("value") value := req.Form.Get("value")
if len(value) == 0 { if len(value) == 0 {
return etcdErr.NewError(200, "Set", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(200, "Set", s.Store().Index())
} }
// Convert time-to-live to an expiration time. // Convert time-to-live to an expiration time.
expireTime, err := store.TTL(req.Form.Get("ttl")) expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil { if err != nil {
return etcdErr.NewError(202, "Set", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(202, "Set", s.Store().Index())
} }
// If the "prevValue" is specified then test-and-set. Otherwise create a new key. // If the "prevValue" is specified then test-and-set. Otherwise create a new key.

View File

@ -6,7 +6,6 @@ import (
"strconv" "strconv"
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
@ -21,14 +20,14 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
if req.Method == "POST" { if req.Method == "POST" {
sinceIndex, err = strconv.ParseUint(string(req.FormValue("index")), 10, 64) sinceIndex, err = strconv.ParseUint(string(req.FormValue("index")), 10, 64)
if err != nil { if err != nil {
return etcdErr.NewError(203, "Watch From Index", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(203, "Watch From Index", s.Store().Index())
} }
} }
// Start the watcher on the store. // Start the watcher on the store.
c, err := s.Store().Watch(key, false, sinceIndex, s.CommitIndex(), s.Term()) c, err := s.Store().Watch(key, false, sinceIndex)
if err != nil { if err != nil {
return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm) return etcdErr.NewError(500, key, s.Store().Index())
} }
event := <-c event := <-c

View File

@ -41,14 +41,14 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
if waitIndex != "" { if waitIndex != "" {
sinceIndex, err = strconv.ParseUint(string(req.FormValue("waitIndex")), 10, 64) sinceIndex, err = strconv.ParseUint(string(req.FormValue("waitIndex")), 10, 64)
if err != nil { if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index())
} }
} }
// Start the watcher on the store. // Start the watcher on the store.
eventChan, err := s.Store().Watch(key, recursive, sinceIndex, s.CommitIndex(), s.Term()) eventChan, err := s.Store().Watch(key, recursive, sinceIndex)
if err != nil { if err != nil {
return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm) return etcdErr.NewError(500, key, s.Store().Index())
} }
cn, _ := w.(http.CloseNotifier) cn, _ := w.(http.CloseNotifier)
@ -62,17 +62,18 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
} else { //get } else { //get
// Retrieve the key from the store. // Retrieve the key from the store.
event, err = s.Store().Get(key, recursive, sorted, s.CommitIndex(), s.Term()) event, err = s.Store().Get(key, recursive, sorted)
if err != nil { if err != nil {
return err return err
} }
} }
w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index)) w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term)) w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
b, _ := json.Marshal(event) b, _ := json.Marshal(event)
w.Write(b) w.Write(b)
return nil return nil

View File

@ -15,7 +15,7 @@ func PostHandler(w http.ResponseWriter, req *http.Request, s Server) error {
value := req.FormValue("value") value := req.FormValue("value")
expireTime, err := store.TTL(req.FormValue("ttl")) expireTime, err := store.TTL(req.FormValue("ttl"))
if err != nil { if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", s.Store().Index())
} }
c := s.Store().CommandFactory().CreateCreateCommand(key, value, expireTime, true) c := s.Store().CommandFactory().CreateCreateCommand(key, value, expireTime, true)

View File

@ -22,7 +22,7 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
value := req.Form.Get("value") value := req.Form.Get("value")
expireTime, err := store.TTL(req.Form.Get("ttl")) expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil { if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", s.Store().Index())
} }
_, valueOk := req.Form["prevValue"] _, valueOk := req.Form["prevValue"]
@ -59,7 +59,7 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
// bad previous index // bad previous index
if err != nil { if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", s.Store().Index())
} }
} else { } else {
prevIndex = 0 prevIndex = 0
@ -67,7 +67,7 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
if valueOk { if valueOk {
if prevValue == "" { if prevValue == "" {
return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", s.Store().Index())
} }
} }
@ -88,7 +88,7 @@ func CreateHandler(w http.ResponseWriter, req *http.Request, s Server, key, valu
func UpdateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error { func UpdateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
// Update should give at least one option // Update should give at least one option
if value == "" && expireTime.Sub(store.Permanent) == 0 { if value == "" && expireTime.Sub(store.Permanent) == 0 {
return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", s.Store().Index())
} }
c := s.Store().CommandFactory().CreateUpdateCommand(key, value, expireTime) c := s.Store().CommandFactory().CreateUpdateCommand(key, value, expireTime)

View File

@ -24,6 +24,6 @@ func TestV2DeleteKey(t *testing.T) {
resp, err = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), url.Values{}) resp, err = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), url.Values{})
body := tests.ReadBody(resp) body := tests.ReadBody(resp)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","index":4,"term":0}`, "") assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","modifiedIndex":2}`, "")
}) })
} }

View File

@ -27,8 +27,7 @@ func TestV2GetKey(t *testing.T) {
assert.Equal(t, body["action"], "get", "") assert.Equal(t, body["action"], "get", "")
assert.Equal(t, body["key"], "/foo/bar", "") assert.Equal(t, body["key"], "/foo/bar", "")
assert.Equal(t, body["value"], "XXX", "") assert.Equal(t, body["value"], "XXX", "")
assert.Equal(t, body["index"], 3, "") assert.Equal(t, body["modifiedIndex"], 1, "")
assert.Equal(t, body["term"], 0, "")
}) })
} }
@ -55,7 +54,7 @@ func TestV2GetKeyRecursively(t *testing.T) {
assert.Equal(t, body["action"], "get", "") assert.Equal(t, body["action"], "get", "")
assert.Equal(t, body["key"], "/foo", "") assert.Equal(t, body["key"], "/foo", "")
assert.Equal(t, body["dir"], true, "") assert.Equal(t, body["dir"], true, "")
assert.Equal(t, body["index"], 4, "") assert.Equal(t, body["modifiedIndex"], 1, "")
assert.Equal(t, len(body["kvs"].([]interface{})), 2, "") assert.Equal(t, len(body["kvs"].([]interface{})), 2, "")
kv0 := body["kvs"].([]interface{})[0].(map[string]interface{}) kv0 := body["kvs"].([]interface{})[0].(map[string]interface{})
@ -81,9 +80,11 @@ func TestV2GetKeyRecursively(t *testing.T) {
func TestV2WatchKey(t *testing.T) { func TestV2WatchKey(t *testing.T) {
tests.RunServer(func(s *server.Server) { tests.RunServer(func(s *server.Server) {
var body map[string]interface{} var body map[string]interface{}
c := make(chan bool)
go func() { go func() {
resp, _ := tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?wait=true")) resp, _ := tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?wait=true"))
body = tests.ReadBodyJSON(resp) body = tests.ReadBodyJSON(resp)
c <- true
}() }()
// Make sure response didn't fire early. // Make sure response didn't fire early.
@ -98,12 +99,19 @@ func TestV2WatchKey(t *testing.T) {
// A response should follow from the GET above. // A response should follow from the GET above.
time.Sleep(1 * time.Millisecond) time.Sleep(1 * time.Millisecond)
select {
case <-c:
default:
t.Fatal("cannot get watch result")
}
assert.NotNil(t, body, "") assert.NotNil(t, body, "")
assert.Equal(t, body["action"], "set", "") assert.Equal(t, body["action"], "set", "")
assert.Equal(t, body["key"], "/foo/bar", "") assert.Equal(t, body["key"], "/foo/bar", "")
assert.Equal(t, body["value"], "XXX", "") assert.Equal(t, body["value"], "XXX", "")
assert.Equal(t, body["index"], 3, "") assert.Equal(t, body["modifiedIndex"], 1, "")
assert.Equal(t, body["term"], 0, "")
}) })
} }
@ -118,7 +126,7 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
var body map[string]interface{} var body map[string]interface{}
c := make(chan bool) c := make(chan bool)
go func() { go func() {
resp, _ := tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=5")) resp, _ := tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=2"))
body = tests.ReadBodyJSON(resp) body = tests.ReadBodyJSON(resp)
c <- true c <- true
}() }()
@ -156,7 +164,6 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
assert.Equal(t, body["action"], "set", "") assert.Equal(t, body["action"], "set", "")
assert.Equal(t, body["key"], "/foo/bar", "") assert.Equal(t, body["key"], "/foo/bar", "")
assert.Equal(t, body["value"], "YYY", "") assert.Equal(t, body["value"], "YYY", "")
assert.Equal(t, body["index"], 4, "") assert.Equal(t, body["modifiedIndex"], 2, "")
assert.Equal(t, body["term"], 0, "")
}) })
} }

View File

@ -21,18 +21,18 @@ func TestV2CreateUnique(t *testing.T) {
resp, _ := tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), nil) resp, _ := tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
body := tests.ReadBodyJSON(resp) body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "create", "") assert.Equal(t, body["action"], "create", "")
assert.Equal(t, body["key"], "/foo/bar/3", "") assert.Equal(t, body["key"], "/foo/bar/1", "")
assert.Equal(t, body["dir"], true, "") assert.Equal(t, body["dir"], true, "")
assert.Equal(t, body["index"], 3, "") assert.Equal(t, body["modifiedIndex"], 1, "")
// Second POST should add next index to list. // Second POST should add next index to list.
resp, _ = tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), nil) resp, _ = tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
body = tests.ReadBodyJSON(resp) body = tests.ReadBodyJSON(resp)
assert.Equal(t, body["key"], "/foo/bar/4", "") assert.Equal(t, body["key"], "/foo/bar/2", "")
// POST to a different key should add index to that list. // POST to a different key should add index to that list.
resp, _ = tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/baz"), nil) resp, _ = tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/baz"), nil)
body = tests.ReadBodyJSON(resp) body = tests.ReadBodyJSON(resp)
assert.Equal(t, body["key"], "/foo/baz/5", "") assert.Equal(t, body["key"], "/foo/baz/3", "")
}) })
} }

View File

@ -22,7 +22,7 @@ func TestV2SetKey(t *testing.T) {
resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v) resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
body := tests.ReadBody(resp) body := tests.ReadBody(resp)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"set","key":"/foo/bar","value":"XXX","index":3,"term":0}`, "") assert.Equal(t, string(body), `{"action":"set","key":"/foo/bar","value":"XXX","modifiedIndex":1}`, "")
}) })
} }
@ -160,7 +160,7 @@ func TestV2UpdateKeyFailOnMissingDirectory(t *testing.T) {
// Ensures that a key is set only if the previous index matches. // Ensures that a key is set only if the previous index matches.
// //
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX // $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevIndex=3 // $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevIndex=1
// //
func TestV2SetKeyCASOnIndexSuccess(t *testing.T) { func TestV2SetKeyCASOnIndexSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) { tests.RunServer(func(s *server.Server) {
@ -169,13 +169,13 @@ func TestV2SetKeyCASOnIndexSuccess(t *testing.T) {
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v) resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp) tests.ReadBody(resp)
v.Set("value", "YYY") v.Set("value", "YYY")
v.Set("prevIndex", "3") v.Set("prevIndex", "1")
resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v) resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
body := tests.ReadBodyJSON(resp) body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndSwap", "") assert.Equal(t, body["action"], "compareAndSwap", "")
assert.Equal(t, body["prevValue"], "XXX", "") assert.Equal(t, body["prevValue"], "XXX", "")
assert.Equal(t, body["value"], "YYY", "") assert.Equal(t, body["value"], "YYY", "")
assert.Equal(t, body["index"], 4, "") assert.Equal(t, body["modifiedIndex"], 2, "")
}) })
} }
@ -196,8 +196,8 @@ func TestV2SetKeyCASOnIndexFail(t *testing.T) {
body := tests.ReadBodyJSON(resp) body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Test Failed", "") assert.Equal(t, body["message"], "Test Failed", "")
assert.Equal(t, body["cause"], "[ != XXX] [10 != 3]", "") assert.Equal(t, body["cause"], "[ != XXX] [10 != 1]", "")
assert.Equal(t, body["index"], 4, "") assert.Equal(t, body["index"], 1, "")
}) })
} }
@ -236,7 +236,7 @@ func TestV2SetKeyCASOnValueSuccess(t *testing.T) {
assert.Equal(t, body["action"], "compareAndSwap", "") assert.Equal(t, body["action"], "compareAndSwap", "")
assert.Equal(t, body["prevValue"], "XXX", "") assert.Equal(t, body["prevValue"], "XXX", "")
assert.Equal(t, body["value"], "YYY", "") assert.Equal(t, body["value"], "YYY", "")
assert.Equal(t, body["index"], 4, "") assert.Equal(t, body["modifiedIndex"], 2, "")
}) })
} }
@ -257,8 +257,8 @@ func TestV2SetKeyCASOnValueFail(t *testing.T) {
body := tests.ReadBodyJSON(resp) body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Test Failed", "") assert.Equal(t, body["message"], "Test Failed", "")
assert.Equal(t, body["cause"], "[AAA != XXX] [0 != 3]", "") assert.Equal(t, body["cause"], "[AAA != XXX] [0 != 1]", "")
assert.Equal(t, body["index"], 4, "") assert.Equal(t, body["index"], 1, "")
}) })
} }

View File

@ -21,6 +21,7 @@ type CommandFactory interface {
CreateUpdateCommand(key string, value string, expireTime time.Time) raft.Command CreateUpdateCommand(key string, value string, expireTime time.Time) raft.Command
CreateDeleteCommand(key string, recursive bool) raft.Command CreateDeleteCommand(key string, recursive bool) raft.Command
CreateCompareAndSwapCommand(key string, value string, prevValue string, prevIndex uint64, expireTime time.Time) raft.Command CreateCompareAndSwapCommand(key string, value string, prevValue string, prevIndex uint64, expireTime time.Time) raft.Command
CreateSyncCommand(now time.Time) raft.Command
} }
// RegisterCommandFactory adds a command factory to the global registry. // RegisterCommandFactory adds a command factory to the global registry.

View File

@ -14,11 +14,6 @@ const (
Expire = "expire" Expire = "expire"
) )
const (
UndefIndex = 0
UndefTerm = 0
)
type Event struct { type Event struct {
Action string `json:"action"` Action string `json:"action"`
Key string `json:"key, omitempty"` Key string `json:"key, omitempty"`
@ -28,17 +23,14 @@ type Event struct {
KVPairs kvPairs `json:"kvs,omitempty"` KVPairs kvPairs `json:"kvs,omitempty"`
Expiration *time.Time `json:"expiration,omitempty"` Expiration *time.Time `json:"expiration,omitempty"`
TTL int64 `json:"ttl,omitempty"` // Time to live in second TTL int64 `json:"ttl,omitempty"` // Time to live in second
// The command index of the raft machine when the command is executed ModifiedIndex uint64 `json:"modifiedIndex"`
Index uint64 `json:"index"`
Term uint64 `json:"term"`
} }
func newEvent(action string, key string, index uint64, term uint64) *Event { func newEvent(action string, key string, index uint64) *Event {
return &Event{ return &Event{
Action: action, Action: action,
Key: key, Key: key,
Index: index, ModifiedIndex: index,
Term: term,
} }
} }
@ -54,6 +46,10 @@ func (e *Event) IsCreated() bool {
return false return false
} }
func (e *Event) Index() uint64 {
return e.ModifiedIndex
}
// Converts an event object into a response object. // Converts an event object into a response object.
func (event *Event) Response() interface{} { func (event *Event) Response() interface{} {
if !event.Dir { if !event.Dir {
@ -62,7 +58,7 @@ func (event *Event) Response() interface{} {
Key: event.Key, Key: event.Key,
Value: event.Value, Value: event.Value,
PrevValue: event.PrevValue, PrevValue: event.PrevValue,
Index: event.Index, Index: event.ModifiedIndex,
TTL: event.TTL, TTL: event.TTL,
Expiration: event.Expiration, Expiration: event.Expiration,
} }
@ -87,7 +83,7 @@ func (event *Event) Response() interface{} {
Key: kv.Key, Key: kv.Key,
Value: kv.Value, Value: kv.Value,
Dir: kv.Dir, Dir: kv.Dir,
Index: event.Index, Index: event.ModifiedIndex,
} }
} }
return responses return responses

View File

@ -12,8 +12,6 @@ type EventHistory struct {
Queue eventQueue Queue eventQueue
StartIndex uint64 StartIndex uint64
LastIndex uint64 LastIndex uint64
LastTerm uint64
DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue
rwl sync.RWMutex rwl sync.RWMutex
} }
@ -31,21 +29,11 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
eh.rwl.Lock() eh.rwl.Lock()
defer eh.rwl.Unlock() defer eh.rwl.Unlock()
var duped uint64
if e.Index == UndefIndex {
e.Index = eh.LastIndex
e.Term = eh.LastTerm
duped = 1
}
eh.Queue.insert(e) eh.Queue.insert(e)
eh.LastIndex = e.Index eh.LastIndex = e.Index()
eh.LastTerm = e.Term
eh.DupCnt += duped
eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index eh.StartIndex = eh.Queue.Events[eh.Queue.Front].ModifiedIndex
return e return e
} }
@ -56,32 +44,31 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Erro
eh.rwl.RLock() eh.rwl.RLock()
defer eh.rwl.RUnlock() defer eh.rwl.RUnlock()
start := index - eh.StartIndex
// the index should locate after the event history's StartIndex // the index should locate after the event history's StartIndex
if start < 0 { if index-eh.StartIndex < 0 {
return nil, return nil,
etcdErr.NewError(etcdErr.EcodeEventIndexCleared, etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
fmt.Sprintf("the requested history has been cleared [%v/%v]", fmt.Sprintf("the requested history has been cleared [%v/%v]",
eh.StartIndex, index), UndefIndex, UndefTerm) eh.StartIndex, index), 0)
} }
// the index should locate before the size of the queue minus the duplicate count // the index should locate before the size of the queue minus the duplicate count
if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index if index > eh.LastIndex { // future index
return nil, nil return nil, nil
} }
i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity)) i := eh.Queue.Front
for { for {
e := eh.Queue.Events[i] e := eh.Queue.Events[i]
if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one
if strings.HasPrefix(e.Key, prefix) && index <= e.Index() { // make sure we bypass the smaller one
return e, nil return e, nil
} }
i = (i + 1) % eh.Queue.Capacity i = (i + 1) % eh.Queue.Capacity
if i == eh.Queue.back() { // find nothing, return and watch from current index if i > eh.Queue.back() {
return nil, nil return nil, nil
} }
} }
@ -105,8 +92,6 @@ func (eh *EventHistory) clone() *EventHistory {
StartIndex: eh.StartIndex, StartIndex: eh.StartIndex,
Queue: clonedQueue, Queue: clonedQueue,
LastIndex: eh.LastIndex, LastIndex: eh.LastIndex,
LastTerm: eh.LastTerm,
DupCnt: eh.DupCnt,
} }
} }

View File

@ -13,7 +13,7 @@ func TestEventQueue(t *testing.T) {
// Add // Add
for i := 0; i < 200; i++ { for i := 0; i < 200; i++ {
e := newEvent(Create, "/foo", uint64(i), 1) e := newEvent(Create, "/foo", uint64(i))
eh.addEvent(e) eh.addEvent(e)
} }
@ -23,7 +23,7 @@ func TestEventQueue(t *testing.T) {
n := eh.Queue.Size n := eh.Queue.Size
for ; n > 0; n-- { for ; n > 0; n-- {
e := eh.Queue.Events[i] e := eh.Queue.Events[i]
if e.Index != uint64(j) { if e.Index() != uint64(j) {
t.Fatalf("queue error!") t.Fatalf("queue error!")
} }
j++ j++
@ -35,26 +35,26 @@ func TestScanHistory(t *testing.T) {
eh := newEventHistory(100) eh := newEventHistory(100)
// Add // Add
eh.addEvent(newEvent(Create, "/foo", 1, 1)) eh.addEvent(newEvent(Create, "/foo", 1))
eh.addEvent(newEvent(Create, "/foo/bar", 2, 1)) eh.addEvent(newEvent(Create, "/foo/bar", 2))
eh.addEvent(newEvent(Create, "/foo/foo", 3, 1)) eh.addEvent(newEvent(Create, "/foo/foo", 3))
eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 1)) eh.addEvent(newEvent(Create, "/foo/bar/bar", 4))
eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 1)) eh.addEvent(newEvent(Create, "/foo/foo/foo", 5))
e, err := eh.scan("/foo", 1) e, err := eh.scan("/foo", 1)
if err != nil || e.Index != 1 { if err != nil || e.Index() != 1 {
t.Fatalf("scan error [/foo] [1] %v", e.Index) t.Fatalf("scan error [/foo] [1] %v", e.Index)
} }
e, err = eh.scan("/foo/bar", 1) e, err = eh.scan("/foo/bar", 1)
if err != nil || e.Index != 2 { if err != nil || e.Index() != 2 {
t.Fatalf("scan error [/foo/bar] [2] %v", e.Index) t.Fatalf("scan error [/foo/bar] [2] %v", e.Index)
} }
e, err = eh.scan("/foo/bar", 3) e, err = eh.scan("/foo/bar", 3)
if err != nil || e.Index != 4 { if err != nil || e.Index() != 4 {
t.Fatalf("scan error [/foo/bar/bar] [4] %v", e.Index) t.Fatalf("scan error [/foo/bar/bar] [4] %v", e.Index)
} }

80
store/heap_test.go Normal file
View File

@ -0,0 +1,80 @@
package store
import (
"fmt"
"testing"
"time"
)
func TestHeapPushPop(t *testing.T) {
h := newTtlKeyHeap()
// add from older expire time to earlier expire time
// the path is equal to ttl from now
for i := 0; i < 10; i++ {
path := fmt.Sprintf("%v", 10-i)
m := time.Duration(10 - i)
n := newKV(nil, path, path, 0, nil, "", time.Now().Add(time.Second*m))
h.push(n)
}
min := time.Now()
for i := 0; i < 10; i++ {
node := h.pop()
if node.ExpireTime.Before(min) {
t.Fatal("heap sort wrong!")
}
min = node.ExpireTime
}
}
func TestHeapUpdate(t *testing.T) {
h := newTtlKeyHeap()
kvs := make([]*Node, 10)
// add from older expire time to earlier expire time
// the path is equal to ttl from now
for i, n := range kvs {
path := fmt.Sprintf("%v", 10-i)
m := time.Duration(10 - i)
n = newKV(nil, path, path, 0, nil, "", time.Now().Add(time.Second*m))
kvs[i] = n
h.push(n)
}
// Path 7
kvs[3].ExpireTime = time.Now().Add(time.Second * 11)
// Path 5
kvs[5].ExpireTime = time.Now().Add(time.Second * 12)
h.update(kvs[3])
h.update(kvs[5])
min := time.Now()
for i := 0; i < 10; i++ {
node := h.pop()
if node.ExpireTime.Before(min) {
t.Fatal("heap sort wrong!")
}
min = node.ExpireTime
if i == 8 {
if node.Path != "7" {
t.Fatal("heap sort wrong!", node.Path)
}
}
if i == 9 {
if node.Path != "5" {
t.Fatal("heap sort wrong!")
}
}
}
}

View File

@ -12,6 +12,7 @@ type KeyValuePair struct {
Expiration *time.Time `json:"expiration,omitempty"` Expiration *time.Time `json:"expiration,omitempty"`
TTL int64 `json:"ttl,omitempty"` // Time to live in second TTL int64 `json:"ttl,omitempty"` // Time to live in second
KVPairs kvPairs `json:"kvs,omitempty"` KVPairs kvPairs `json:"kvs,omitempty"`
ModifiedIndex uint64 `json:"modifiedIndex,omitempty"`
} }
type kvPairs []KeyValuePair type kvPairs []KeyValuePair

View File

@ -3,20 +3,12 @@ package store
import ( import (
"path" "path"
"sort" "sort"
"sync"
"time" "time"
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
) )
var ( var Permanent time.Time
Permanent time.Time
)
const (
normal = iota
removed
)
// Node is the basic element in the store system. // Node is the basic element in the store system.
// A key-value pair will have a string value // A key-value pair will have a string value
@ -25,11 +17,9 @@ type Node struct {
Path string Path string
CreateIndex uint64 CreateIndex uint64
CreateTerm uint64
ModifiedIndex uint64 ModifiedIndex uint64
ModifiedTerm uint64
Parent *Node `json:"-"` // should not encode this field! avoid cyclical dependency. Parent *Node `json:"-"` // should not encode this field! avoid circular dependency.
ExpireTime time.Time ExpireTime time.Time
ACL string ACL string
@ -38,46 +28,34 @@ type Node struct {
// A reference to the store this node is attached to. // A reference to the store this node is attached to.
store *store store *store
// a ttl node will have an expire routine associated with it.
// we need a channel to stop that routine when the expiration changes.
stopExpire chan bool
// ensure we only delete the node once
// expire and remove may try to delete a node twice
once sync.Once
} }
// newKV creates a Key-Value pair // newKV creates a Key-Value pair
func newKV(store *store, nodePath string, value string, createIndex uint64, func newKV(store *store, nodePath string, value string, createIndex uint64,
createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node { parent *Node, ACL string, expireTime time.Time) *Node {
return &Node{ return &Node{
Path: nodePath, Path: nodePath,
CreateIndex: createIndex, CreateIndex: createIndex,
CreateTerm: createTerm,
ModifiedIndex: createIndex, ModifiedIndex: createIndex,
ModifiedTerm: createTerm,
Parent: parent, Parent: parent,
ACL: ACL, ACL: ACL,
store: store, store: store,
stopExpire: make(chan bool, 1),
ExpireTime: expireTime, ExpireTime: expireTime,
Value: value, Value: value,
} }
} }
// newDir creates a directory // newDir creates a directory
func newDir(store *store, nodePath string, createIndex uint64, createTerm uint64, func newDir(store *store, nodePath string, createIndex uint64, parent *Node,
parent *Node, ACL string, expireTime time.Time) *Node { ACL string, expireTime time.Time) *Node {
return &Node{ return &Node{
Path: nodePath, Path: nodePath,
CreateIndex: createIndex, CreateIndex: createIndex,
CreateTerm: createTerm, ModifiedIndex: createIndex,
Parent: parent, Parent: parent,
ACL: ACL, ACL: ACL,
stopExpire: make(chan bool, 1),
ExpireTime: expireTime, ExpireTime: expireTime,
Children: make(map[string]*Node), Children: make(map[string]*Node),
store: store, store: store,
@ -97,21 +75,10 @@ func (n *Node) IsHidden() bool {
// IsPermanent function checks if the node is a permanent one. // IsPermanent function checks if the node is a permanent one.
func (n *Node) IsPermanent() bool { func (n *Node) IsPermanent() bool {
return n.ExpireTime.Sub(Permanent) == 0 // we use a uninitialized time.Time to indicate the node is a
} // permanent one.
// the uninitialized time.Time should equal zero.
// IsExpired function checks if the node has been expired. return n.ExpireTime.IsZero()
func (n *Node) IsExpired() (bool, time.Duration) {
if n.IsPermanent() {
return false, 0
}
duration := n.ExpireTime.Sub(time.Now())
if duration <= 0 {
return true, 0
}
return false, duration
} }
// IsDir function checks whether the node is a directory. // IsDir function checks whether the node is a directory.
@ -125,7 +92,7 @@ func (n *Node) IsDir() bool {
// If the receiver node is not a key-value pair, a "Not A File" error will be returned. // If the receiver node is not a key-value pair, a "Not A File" error will be returned.
func (n *Node) Read() (string, *etcdErr.Error) { func (n *Node) Read() (string, *etcdErr.Error) {
if n.IsDir() { if n.IsDir() {
return "", etcdErr.NewError(etcdErr.EcodeNotFile, "", UndefIndex, UndefTerm) return "", etcdErr.NewError(etcdErr.EcodeNotFile, "", n.store.Index())
} }
return n.Value, nil return n.Value, nil
@ -133,20 +100,19 @@ func (n *Node) Read() (string, *etcdErr.Error) {
// Write function set the value of the node to the given value. // Write function set the value of the node to the given value.
// If the receiver node is a directory, a "Not A File" error will be returned. // If the receiver node is a directory, a "Not A File" error will be returned.
func (n *Node) Write(value string, index uint64, term uint64) *etcdErr.Error { func (n *Node) Write(value string, index uint64) *etcdErr.Error {
if n.IsDir() { if n.IsDir() {
return etcdErr.NewError(etcdErr.EcodeNotFile, "", UndefIndex, UndefTerm) return etcdErr.NewError(etcdErr.EcodeNotFile, "", n.store.Index())
} }
n.Value = value n.Value = value
n.ModifiedIndex = index n.ModifiedIndex = index
n.ModifiedTerm = term
return nil return nil
} }
func (n *Node) ExpirationAndTTL() (*time.Time, int64) { func (n *Node) ExpirationAndTTL() (*time.Time, int64) {
if n.ExpireTime.Sub(Permanent) != 0 { if !n.IsPermanent() {
return &n.ExpireTime, int64(n.ExpireTime.Sub(time.Now())/time.Second) + 1 return &n.ExpireTime, int64(n.ExpireTime.Sub(time.Now())/time.Second) + 1
} }
return nil, 0 return nil, 0
@ -156,7 +122,7 @@ func (n *Node) ExpirationAndTTL() (*time.Time, int64) {
// If the receiver node is not a directory, a "Not A Directory" error will be returned. // If the receiver node is not a directory, a "Not A Directory" error will be returned.
func (n *Node) List() ([]*Node, *etcdErr.Error) { func (n *Node) List() ([]*Node, *etcdErr.Error) {
if !n.IsDir() { if !n.IsDir() {
return nil, etcdErr.NewError(etcdErr.EcodeNotDir, "", UndefIndex, UndefTerm) return nil, etcdErr.NewError(etcdErr.EcodeNotDir, "", n.store.Index())
} }
nodes := make([]*Node, len(n.Children)) nodes := make([]*Node, len(n.Children))
@ -174,7 +140,7 @@ func (n *Node) List() ([]*Node, *etcdErr.Error) {
// On success, it returns the file node // On success, it returns the file node
func (n *Node) GetChild(name string) (*Node, *etcdErr.Error) { func (n *Node) GetChild(name string) (*Node, *etcdErr.Error) {
if !n.IsDir() { if !n.IsDir() {
return nil, etcdErr.NewError(etcdErr.EcodeNotDir, n.Path, UndefIndex, UndefTerm) return nil, etcdErr.NewError(etcdErr.EcodeNotDir, n.Path, n.store.Index())
} }
child, ok := n.Children[name] child, ok := n.Children[name]
@ -192,7 +158,7 @@ func (n *Node) GetChild(name string) (*Node, *etcdErr.Error) {
// error will be returned // error will be returned
func (n *Node) Add(child *Node) *etcdErr.Error { func (n *Node) Add(child *Node) *etcdErr.Error {
if !n.IsDir() { if !n.IsDir() {
return etcdErr.NewError(etcdErr.EcodeNotDir, "", UndefIndex, UndefTerm) return etcdErr.NewError(etcdErr.EcodeNotDir, "", n.store.Index())
} }
_, name := path.Split(child.Path) _, name := path.Split(child.Path)
@ -200,7 +166,7 @@ func (n *Node) Add(child *Node) *etcdErr.Error {
_, ok := n.Children[name] _, ok := n.Children[name]
if ok { if ok {
return etcdErr.NewError(etcdErr.EcodeNodeExist, "", UndefIndex, UndefTerm) return etcdErr.NewError(etcdErr.EcodeNodeExist, "", n.store.Index())
} }
n.Children[name] = child n.Children[name] = child
@ -213,22 +179,9 @@ func (n *Node) Remove(recursive bool, callback func(path string)) *etcdErr.Error
if n.IsDir() && !recursive { if n.IsDir() && !recursive {
// cannot delete a directory without set recursive to true // cannot delete a directory without set recursive to true
return etcdErr.NewError(etcdErr.EcodeNotFile, "", UndefIndex, UndefTerm) return etcdErr.NewError(etcdErr.EcodeNotFile, "", n.store.Index())
} }
onceBody := func() {
n.internalRemove(recursive, callback)
}
// this function might be entered multiple times by expire and delete
// every node will only be deleted once.
n.once.Do(onceBody)
return nil
}
// internalRemove function will be called by remove()
func (n *Node) internalRemove(recursive bool, callback func(path string)) {
if !n.IsDir() { // key-value pair if !n.IsDir() { // key-value pair
_, name := path.Split(n.Path) _, name := path.Split(n.Path)
@ -241,9 +194,11 @@ func (n *Node) internalRemove(recursive bool, callback func(path string)) {
callback(n.Path) callback(n.Path)
} }
// the stop channel has a buffer. just send to it! if !n.IsPermanent() {
n.stopExpire <- true n.store.ttlKeyHeap.remove(n)
return }
return nil
} }
for _, child := range n.Children { // delete all children for _, child := range n.Children { // delete all children
@ -259,61 +214,13 @@ func (n *Node) internalRemove(recursive bool, callback func(path string)) {
callback(n.Path) callback(n.Path)
} }
n.stopExpire <- true if !n.IsPermanent() {
} n.store.ttlKeyHeap.remove(n)
} }
// Expire function will test if the node is expired.
// if the node is already expired, delete the node and return.
// if the node is permanent (this shouldn't happen), return at once.
// else wait for a period time, then remove the node. and notify the watchhub.
func (n *Node) Expire() {
expired, duration := n.IsExpired()
if expired { // has been expired
// since the parent function of Expire() runs serially,
// there is no need for lock here
e := newEvent(Expire, n.Path, UndefIndex, UndefTerm)
n.store.WatcherHub.notify(e)
n.Remove(true, nil)
n.store.Stats.Inc(ExpireCount)
return
} }
if duration == 0 { // Permanent Node return nil
return
}
go func() { // do monitoring
select {
// if timeout, delete the node
case <-time.After(duration):
// before expire get the lock, the expiration time
// of the node may be updated.
// we have to check again when get the lock
n.store.worldLock.Lock()
defer n.store.worldLock.Unlock()
expired, _ := n.IsExpired()
if expired {
e := newEvent(Expire, n.Path, UndefIndex, UndefTerm)
n.store.WatcherHub.notify(e)
n.Remove(true, nil)
n.store.Stats.Inc(ExpireCount)
}
return
// if stopped, return
case <-n.stopExpire:
return
}
}()
} }
func (n *Node) Pair(recurisive, sorted bool) KeyValuePair { func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
@ -321,6 +228,7 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
pair := KeyValuePair{ pair := KeyValuePair{
Key: n.Path, Key: n.Path,
Dir: true, Dir: true,
ModifiedIndex: n.ModifiedIndex,
} }
pair.Expiration, pair.TTL = n.ExpirationAndTTL() pair.Expiration, pair.TTL = n.ExpirationAndTTL()
@ -358,26 +266,33 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
pair := KeyValuePair{ pair := KeyValuePair{
Key: n.Path, Key: n.Path,
Value: n.Value, Value: n.Value,
ModifiedIndex: n.ModifiedIndex,
} }
pair.Expiration, pair.TTL = n.ExpirationAndTTL() pair.Expiration, pair.TTL = n.ExpirationAndTTL()
return pair return pair
} }
func (n *Node) UpdateTTL(expireTime time.Time) { func (n *Node) UpdateTTL(expireTime time.Time) {
if !n.IsPermanent() { if !n.IsPermanent() {
// check if the node has been expired if expireTime.IsZero() {
// if the node is not expired, we need to stop the go routine associated with // from ttl to permanent
// that node. // remove from ttl heap
expired, _ := n.IsExpired() n.store.ttlKeyHeap.remove(n)
} else {
if !expired { // update ttl
n.stopExpire <- true // suspend it to modify the expiration
}
}
n.ExpireTime = expireTime n.ExpireTime = expireTime
if expireTime.Sub(Permanent) != 0 { // update ttl heap
n.Expire() n.store.ttlKeyHeap.update(n)
}
} else {
if !expireTime.IsZero() {
// from permanent to ttl
n.ExpireTime = expireTime
// push into ttl heap
n.store.ttlKeyHeap.push(n)
}
} }
} }
@ -386,10 +301,10 @@ func (n *Node) UpdateTTL(expireTime time.Time) {
// If the node is a key-value pair, it will clone the pair. // If the node is a key-value pair, it will clone the pair.
func (n *Node) Clone() *Node { func (n *Node) Clone() *Node {
if !n.IsDir() { if !n.IsDir() {
return newKV(n.store, n.Path, n.Value, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime) return newKV(n.store, n.Path, n.Value, n.CreateIndex, n.Parent, n.ACL, n.ExpireTime)
} }
clone := newDir(n.store, n.Path, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime) clone := newDir(n.store, n.Path, n.CreateIndex, n.Parent, n.ACL, n.ExpireTime)
for key, child := range n.Children { for key, child := range n.Children {
clone.Children[key] = child.Clone() clone.Children[key] = child.Clone()
@ -414,7 +329,8 @@ func (n *Node) recoverAndclean() {
} }
} }
n.stopExpire = make(chan bool, 1) if !n.ExpireTime.IsZero() {
n.store.ttlKeyHeap.push(n)
n.Expire() }
} }

View File

@ -10,85 +10,92 @@ import (
// Ensure that a successful Get is recorded in the stats. // Ensure that a successful Get is recorded in the stats.
func TestStoreStatsGetSuccess(t *testing.T) { func TestStoreStatsGetSuccess(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 3, 1) s.Create("/foo", "bar", false, Permanent)
s.Get("/foo", false, false, 3, 1) s.Get("/foo", false, false)
assert.Equal(t, uint64(1), s.Stats.GetSuccess, "") assert.Equal(t, uint64(1), s.Stats.GetSuccess, "")
} }
// Ensure that a failed Get is recorded in the stats. // Ensure that a failed Get is recorded in the stats.
func TestStoreStatsGetFail(t *testing.T) { func TestStoreStatsGetFail(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 3, 1) s.Create("/foo", "bar", false, Permanent)
s.Get("/no_such_key", false, false, 3, 1) s.Get("/no_such_key", false, false)
assert.Equal(t, uint64(1), s.Stats.GetFail, "") assert.Equal(t, uint64(1), s.Stats.GetFail, "")
} }
// Ensure that a successful Create is recorded in the stats. // Ensure that a successful Create is recorded in the stats.
func TestStoreStatsCreateSuccess(t *testing.T) { func TestStoreStatsCreateSuccess(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 3, 1) s.Create("/foo", "bar", false, Permanent)
assert.Equal(t, uint64(1), s.Stats.CreateSuccess, "") assert.Equal(t, uint64(1), s.Stats.CreateSuccess, "")
} }
// Ensure that a failed Create is recorded in the stats. // Ensure that a failed Create is recorded in the stats.
func TestStoreStatsCreateFail(t *testing.T) { func TestStoreStatsCreateFail(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "", false, Permanent, 3, 1) s.Create("/foo", "", false, Permanent)
s.Create("/foo", "bar", false, Permanent, 4, 1) s.Create("/foo", "bar", false, Permanent)
assert.Equal(t, uint64(1), s.Stats.CreateFail, "") assert.Equal(t, uint64(1), s.Stats.CreateFail, "")
} }
// Ensure that a successful Update is recorded in the stats. // Ensure that a successful Update is recorded in the stats.
func TestStoreStatsUpdateSuccess(t *testing.T) { func TestStoreStatsUpdateSuccess(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 3, 1) s.Create("/foo", "bar", false, Permanent)
s.Update("/foo", "baz", Permanent, 4, 1) s.Update("/foo", "baz", Permanent)
assert.Equal(t, uint64(1), s.Stats.UpdateSuccess, "") assert.Equal(t, uint64(1), s.Stats.UpdateSuccess, "")
} }
// Ensure that a failed Update is recorded in the stats. // Ensure that a failed Update is recorded in the stats.
func TestStoreStatsUpdateFail(t *testing.T) { func TestStoreStatsUpdateFail(t *testing.T) {
s := newStore() s := newStore()
s.Update("/foo", "bar", Permanent, 4, 1) s.Update("/foo", "bar", Permanent)
assert.Equal(t, uint64(1), s.Stats.UpdateFail, "") assert.Equal(t, uint64(1), s.Stats.UpdateFail, "")
} }
// Ensure that a successful CAS is recorded in the stats. // Ensure that a successful CAS is recorded in the stats.
func TestStoreStatsCompareAndSwapSuccess(t *testing.T) { func TestStoreStatsCompareAndSwapSuccess(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 3, 1) s.Create("/foo", "bar", false, Permanent)
s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent, 4, 1) s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
assert.Equal(t, uint64(1), s.Stats.CompareAndSwapSuccess, "") assert.Equal(t, uint64(1), s.Stats.CompareAndSwapSuccess, "")
} }
// Ensure that a failed CAS is recorded in the stats. // Ensure that a failed CAS is recorded in the stats.
func TestStoreStatsCompareAndSwapFail(t *testing.T) { func TestStoreStatsCompareAndSwapFail(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 3, 1) s.Create("/foo", "bar", false, Permanent)
s.CompareAndSwap("/foo", "wrong_value", 0, "baz", Permanent, 4, 1) s.CompareAndSwap("/foo", "wrong_value", 0, "baz", Permanent)
assert.Equal(t, uint64(1), s.Stats.CompareAndSwapFail, "") assert.Equal(t, uint64(1), s.Stats.CompareAndSwapFail, "")
} }
// Ensure that a successful Delete is recorded in the stats. // Ensure that a successful Delete is recorded in the stats.
func TestStoreStatsDeleteSuccess(t *testing.T) { func TestStoreStatsDeleteSuccess(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 3, 1) s.Create("/foo", "bar", false, Permanent)
s.Delete("/foo", false, 4, 1) s.Delete("/foo", false)
assert.Equal(t, uint64(1), s.Stats.DeleteSuccess, "") assert.Equal(t, uint64(1), s.Stats.DeleteSuccess, "")
} }
// Ensure that a failed Delete is recorded in the stats. // Ensure that a failed Delete is recorded in the stats.
func TestStoreStatsDeleteFail(t *testing.T) { func TestStoreStatsDeleteFail(t *testing.T) {
s := newStore() s := newStore()
s.Delete("/foo", false, 4, 1) s.Delete("/foo", false)
assert.Equal(t, uint64(1), s.Stats.DeleteFail, "") assert.Equal(t, uint64(1), s.Stats.DeleteFail, "")
} }
//Ensure that the number of expirations is recorded in the stats. //Ensure that the number of expirations is recorded in the stats.
func TestStoreStatsExpireCount(t *testing.T) { func TestStoreStatsExpireCount(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, time.Now().Add(5 * time.Millisecond), 3, 1)
c := make(chan bool)
defer func() {
c <- true
}()
go mockSyncService(s.DeleteExpiredKeys, c)
s.Create("/foo", "bar", false, time.Now().Add(500*time.Millisecond))
assert.Equal(t, uint64(0), s.Stats.ExpireCount, "") assert.Equal(t, uint64(0), s.Stats.ExpireCount, "")
time.Sleep(10 * time.Millisecond) time.Sleep(600 * time.Millisecond)
assert.Equal(t, uint64(1), s.Stats.ExpireCount, "") assert.Equal(t, uint64(1), s.Stats.ExpireCount, "")
} }

View File

@ -41,28 +41,30 @@ func init() {
type Store interface { type Store interface {
Version() int Version() int
CommandFactory() CommandFactory CommandFactory() CommandFactory
Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) Index() uint64
Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) Get(nodePath string, recursive, sorted bool) (*Event, error)
Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) Set(nodePath string, value string, expireTime time.Time) (*Event, error)
Create(nodePath string, value string, incrementalSuffix bool, expireTime time.Time, Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
index uint64, term uint64) (*Event, error) Create(nodePath string, value string, incrementalSuffix bool,
expireTime time.Time) (*Event, error)
CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time, index uint64, term uint64) (*Event, error) value string, expireTime time.Time) (*Event, error)
Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) Delete(nodePath string, recursive bool) (*Event, error)
Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error)
Save() ([]byte, error) Save() ([]byte, error)
Recovery(state []byte) error Recovery(state []byte) error
TotalTransactions() uint64 TotalTransactions() uint64
JsonStats() []byte JsonStats() []byte
DeleteExpiredKeys(cutoff time.Time)
} }
type store struct { type store struct {
Root *Node Root *Node
WatcherHub *watcherHub WatcherHub *watcherHub
Index uint64 CurrentIndex uint64
Term uint64
Stats *Stats Stats *Stats
CurrentVersion int CurrentVersion int
ttlKeyHeap *ttlKeyHeap // need to recovery manually
worldLock sync.RWMutex // stop the world lock worldLock sync.RWMutex // stop the world lock
} }
@ -73,9 +75,10 @@ func New() Store {
func newStore() *store { func newStore() *store {
s := new(store) s := new(store)
s.CurrentVersion = defaultVersion s.CurrentVersion = defaultVersion
s.Root = newDir(s, "/", UndefIndex, UndefTerm, nil, "", Permanent) s.Root = newDir(s, "/", s.CurrentIndex, nil, "", Permanent)
s.Stats = newStats() s.Stats = newStats()
s.WatcherHub = newWatchHub(1000) s.WatcherHub = newWatchHub(1000)
s.ttlKeyHeap = newTtlKeyHeap()
return s return s
} }
@ -84,6 +87,11 @@ func (s *store) Version() int {
return s.CurrentVersion return s.CurrentVersion
} }
// Retrieves current of the store
func (s *store) Index() uint64 {
return s.CurrentIndex
}
// CommandFactory retrieves the command factory for the current version of the store. // CommandFactory retrieves the command factory for the current version of the store.
func (s *store) CommandFactory() CommandFactory { func (s *store) CommandFactory() CommandFactory {
return GetCommandFactory(s.Version()) return GetCommandFactory(s.Version())
@ -92,20 +100,20 @@ func (s *store) CommandFactory() CommandFactory {
// Get function returns a get event. // Get function returns a get event.
// If recursive is true, it will return all the content under the node path. // If recursive is true, it will return all the content under the node path.
// If sorted is true, it will sort the content by keys. // If sorted is true, it will sort the content by keys.
func (s *store) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) { func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
s.worldLock.RLock() s.worldLock.RLock()
defer s.worldLock.RUnlock() defer s.worldLock.RUnlock()
nodePath = path.Clean(path.Join("/", nodePath)) nodePath = path.Clean(path.Join("/", nodePath))
n, err := s.internalGet(nodePath, index, term) n, err := s.internalGet(nodePath)
if err != nil { if err != nil {
s.Stats.Inc(GetFail) s.Stats.Inc(GetFail)
return nil, err return nil, err
} }
e := newEvent(Get, nodePath, index, term) e := newEvent(Get, nodePath, n.ModifiedIndex)
if n.IsDir() { // node is a directory if n.IsDir() { // node is a directory
e.Dir = true e.Dir = true
@ -147,13 +155,12 @@ func (s *store) Get(nodePath string, recursive, sorted bool, index uint64, term
// Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl. // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl.
// If the node has already existed, create will fail. // If the node has already existed, create will fail.
// If any node on the path is a file, create will fail. // If any node on the path is a file, create will fail.
func (s *store) Create(nodePath string, value string, unique bool, func (s *store) Create(nodePath string, value string, unique bool, expireTime time.Time) (*Event, error) {
expireTime time.Time, index uint64, term uint64) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath)) nodePath = path.Clean(path.Join("/", nodePath))
s.worldLock.Lock() s.worldLock.Lock()
defer s.worldLock.Unlock() defer s.worldLock.Unlock()
e, err := s.internalCreate(nodePath, value, unique, false, expireTime, index, term, Create) e, err := s.internalCreate(nodePath, value, unique, false, expireTime, Create)
if err == nil { if err == nil {
s.Stats.Inc(CreateSuccess) s.Stats.Inc(CreateSuccess)
@ -165,12 +172,12 @@ func (s *store) Create(nodePath string, value string, unique bool,
} }
// Set function creates or replace the Node at nodePath. // Set function creates or replace the Node at nodePath.
func (s *store) Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { func (s *store) Set(nodePath string, value string, expireTime time.Time) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath)) nodePath = path.Clean(path.Join("/", nodePath))
s.worldLock.Lock() s.worldLock.Lock()
defer s.worldLock.Unlock() defer s.worldLock.Unlock()
e, err := s.internalCreate(nodePath, value, false, true, expireTime, index, term, Set) e, err := s.internalCreate(nodePath, value, false, true, expireTime, Set)
if err == nil { if err == nil {
s.Stats.Inc(SetSuccess) s.Stats.Inc(SetSuccess)
@ -182,14 +189,14 @@ func (s *store) Set(nodePath string, value string, expireTime time.Time, index u
} }
func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { value string, expireTime time.Time) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath)) nodePath = path.Clean(path.Join("/", nodePath))
s.worldLock.Lock() s.worldLock.Lock()
defer s.worldLock.Unlock() defer s.worldLock.Unlock()
n, err := s.internalGet(nodePath, index, term) n, err := s.internalGet(nodePath)
if err != nil { if err != nil {
s.Stats.Inc(CompareAndSwapFail) s.Stats.Inc(CompareAndSwapFail)
@ -198,17 +205,20 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
if n.IsDir() { // can only test and set file if n.IsDir() { // can only test and set file
s.Stats.Inc(CompareAndSwapFail) s.Stats.Inc(CompareAndSwapFail)
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term) return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
} }
// If both of the prevValue and prevIndex are given, we will test both of them. // If both of the prevValue and prevIndex are given, we will test both of them.
// Command will be executed, only if both of the tests are successful. // Command will be executed, only if both of the tests are successful.
if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) { if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
e := newEvent(CompareAndSwap, nodePath, index, term) // update etcd index
s.CurrentIndex++
e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex)
e.PrevValue = n.Value e.PrevValue = n.Value
// if test succeed, write the value // if test succeed, write the value
n.Write(value, index, term) n.Write(value, s.CurrentIndex)
n.UpdateTTL(expireTime) n.UpdateTTL(expireTime)
e.Value = value e.Value = value
@ -221,25 +231,27 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex) cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
s.Stats.Inc(CompareAndSwapFail) s.Stats.Inc(CompareAndSwapFail)
return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, index, term) return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
} }
// Delete function deletes the node at the given path. // Delete function deletes the node at the given path.
// If the node is a directory, recursive must be true to delete it. // If the node is a directory, recursive must be true to delete it.
func (s *store) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) { func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath)) nodePath = path.Clean(path.Join("/", nodePath))
s.worldLock.Lock() s.worldLock.Lock()
defer s.worldLock.Unlock() defer s.worldLock.Unlock()
n, err := s.internalGet(nodePath, index, term) nextIndex := s.CurrentIndex + 1
n, err := s.internalGet(nodePath)
if err != nil { // if the node does not exist, return error if err != nil { // if the node does not exist, return error
s.Stats.Inc(DeleteFail) s.Stats.Inc(DeleteFail)
return nil, err return nil, err
} }
e := newEvent(Delete, nodePath, index, term) e := newEvent(Delete, nodePath, nextIndex)
if n.IsDir() { if n.IsDir() {
e.Dir = true e.Dir = true
@ -259,33 +271,37 @@ func (s *store) Delete(nodePath string, recursive bool, index uint64, term uint6
return nil, err return nil, err
} }
// update etcd index
s.CurrentIndex++
s.WatcherHub.notify(e) s.WatcherHub.notify(e)
s.Stats.Inc(DeleteSuccess) s.Stats.Inc(DeleteSuccess)
return e, nil return e, nil
} }
func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) { func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error) {
prefix = path.Clean(path.Join("/", prefix)) prefix = path.Clean(path.Join("/", prefix))
nextIndex := s.CurrentIndex + 1
s.worldLock.RLock() s.worldLock.RLock()
defer s.worldLock.RUnlock() defer s.worldLock.RUnlock()
s.Index, s.Term = index, term
var c <-chan *Event var c <-chan *Event
var err *etcdErr.Error var err *etcdErr.Error
if sinceIndex == 0 { if sinceIndex == 0 {
c, err = s.WatcherHub.watch(prefix, recursive, index+1) c, err = s.WatcherHub.watch(prefix, recursive, nextIndex)
} else { } else {
c, err = s.WatcherHub.watch(prefix, recursive, sinceIndex) c, err = s.WatcherHub.watch(prefix, recursive, sinceIndex)
} }
if err != nil { if err != nil {
err.Index = index // watchhub do not know the current Index
err.Term = term // we need to attach the currentIndex here
err.Index = s.CurrentIndex
return nil, err return nil, err
} }
@ -317,52 +333,59 @@ func (s *store) walk(nodePath string, walkFunc func(prev *Node, component string
// Update function updates the value/ttl of the node. // Update function updates the value/ttl of the node.
// If the node is a file, the value and the ttl can be updated. // If the node is a file, the value and the ttl can be updated.
// If the node is a directory, only the ttl can be updated. // If the node is a directory, only the ttl can be updated.
func (s *store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) { func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
s.worldLock.Lock() s.worldLock.Lock()
defer s.worldLock.Unlock() defer s.worldLock.Unlock()
currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
nodePath = path.Clean(path.Join("/", nodePath)) nodePath = path.Clean(path.Join("/", nodePath))
n, err := s.internalGet(nodePath, index, term) n, err := s.internalGet(nodePath)
if err != nil { // if the node does not exist, return error if err != nil { // if the node does not exist, return error
s.Stats.Inc(UpdateFail) s.Stats.Inc(UpdateFail)
return nil, err return nil, err
} }
e := newEvent(Update, nodePath, s.Index, s.Term) e := newEvent(Update, nodePath, nextIndex)
if len(newValue) != 0 { if len(newValue) != 0 {
if n.IsDir() { if n.IsDir() {
// if the node is a directory, we cannot update value // if the node is a directory, we cannot update value
s.Stats.Inc(UpdateFail) s.Stats.Inc(UpdateFail)
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term) return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
} }
e.PrevValue = n.Value e.PrevValue = n.Value
n.Write(newValue, index, term) n.Write(newValue, nextIndex)
e.Value = newValue
} else {
// do not update value
e.Value = n.Value
} }
// update ttl // update ttl
n.UpdateTTL(expireTime) n.UpdateTTL(expireTime)
e.Value = newValue
e.Expiration, e.TTL = n.ExpirationAndTTL() e.Expiration, e.TTL = n.ExpirationAndTTL()
s.WatcherHub.notify(e) s.WatcherHub.notify(e)
s.Stats.Inc(UpdateSuccess) s.Stats.Inc(UpdateSuccess)
s.CurrentIndex = nextIndex
return e, nil return e, nil
} }
func (s *store) internalCreate(nodePath string, value string, unique bool, replace bool, func (s *store) internalCreate(nodePath string, value string, unique bool, replace bool,
expireTime time.Time, index uint64, term uint64, action string) (*Event, error) { expireTime time.Time, action string) (*Event, error) {
s.Index, s.Term = index, term currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
if unique { // append unique item under the node path if unique { // append unique item under the node path
nodePath += "/" + strconv.FormatUint(index, 10) nodePath += "/" + strconv.FormatUint(nextIndex, 10)
} }
nodePath = path.Clean(path.Join("/", nodePath)) nodePath = path.Clean(path.Join("/", nodePath))
@ -381,11 +404,11 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
if err != nil { if err != nil {
s.Stats.Inc(SetFail) s.Stats.Inc(SetFail)
err.Index, err.Term = s.Index, s.Term err.Index = currIndex
return nil, err return nil, err
} }
e := newEvent(action, nodePath, s.Index, s.Term) e := newEvent(action, nodePath, nextIndex)
n, _ := d.GetChild(newNodeName) n, _ := d.GetChild(newNodeName)
@ -393,25 +416,25 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
if n != nil { if n != nil {
if replace { if replace {
if n.IsDir() { if n.IsDir() {
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term) return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
} }
e.PrevValue, _ = n.Read() e.PrevValue, _ = n.Read()
n.Remove(false, nil) n.Remove(false, nil)
} else { } else {
return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, index, term) return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
} }
} }
if len(value) != 0 { // create file if len(value) != 0 { // create file
e.Value = value e.Value = value
n = newKV(s, nodePath, value, index, term, d, "", expireTime) n = newKV(s, nodePath, value, nextIndex, d, "", expireTime)
} else { // create directory } else { // create directory
e.Dir = true e.Dir = true
n = newDir(s, nodePath, index, term, d, "", expireTime) n = newDir(s, nodePath, nextIndex, d, "", expireTime)
} }
@ -419,28 +442,26 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
d.Add(n) d.Add(n)
// Node with TTL // Node with TTL
if expireTime.Sub(Permanent) != 0 { if !n.IsPermanent() {
n.Expire() s.ttlKeyHeap.push(n)
e.Expiration, e.TTL = n.ExpirationAndTTL() e.Expiration, e.TTL = n.ExpirationAndTTL()
} }
s.CurrentIndex = nextIndex
s.WatcherHub.notify(e) s.WatcherHub.notify(e)
return e, nil return e, nil
} }
// InternalGet function get the node of the given nodePath. // InternalGet function get the node of the given nodePath.
func (s *store) internalGet(nodePath string, index uint64, term uint64) (*Node, *etcdErr.Error) { func (s *store) internalGet(nodePath string) (*Node, *etcdErr.Error) {
nodePath = path.Clean(path.Join("/", nodePath)) nodePath = path.Clean(path.Join("/", nodePath))
// update file system known index and term
if index > s.Index {
s.Index, s.Term = index, term
}
walkFunc := func(parent *Node, name string) (*Node, *etcdErr.Error) { walkFunc := func(parent *Node, name string) (*Node, *etcdErr.Error) {
if !parent.IsDir() { if !parent.IsDir() {
err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, index, term) err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
return nil, err return nil, err
} }
@ -449,7 +470,7 @@ func (s *store) internalGet(nodePath string, index uint64, term uint64) (*Node,
return child, nil return child, nil
} }
return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), index, term) return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
} }
f, err := s.walk(nodePath, walkFunc) f, err := s.walk(nodePath, walkFunc)
@ -460,6 +481,28 @@ func (s *store) internalGet(nodePath string, index uint64, term uint64) (*Node,
return f, nil return f, nil
} }
// deleteExpiredKyes will delete all
func (s *store) DeleteExpiredKeys(cutoff time.Time) {
s.worldLock.Lock()
defer s.worldLock.Unlock()
for {
node := s.ttlKeyHeap.top()
if node == nil || node.ExpireTime.After(cutoff) {
break
}
s.ttlKeyHeap.pop()
node.Remove(true, nil)
s.CurrentIndex++
s.Stats.Inc(ExpireCount)
s.WatcherHub.notify(newEvent(Expire, node.Path, s.CurrentIndex))
}
}
// checkDir function will check whether the component is a directory under parent node. // checkDir function will check whether the component is a directory under parent node.
// If it is a directory, this function will return the pointer to that node. // If it is a directory, this function will return the pointer to that node.
// If it does not exist, this function will create a new directory and return the pointer to that node. // If it does not exist, this function will create a new directory and return the pointer to that node.
@ -472,10 +515,10 @@ func (s *store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) {
return node, nil return node, nil
} }
return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, UndefIndex, UndefTerm) return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
} }
n := newDir(s, path.Join(parent.Path, dirName), s.Index, s.Term, parent, parent.ACL, Permanent) n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, parent.ACL, Permanent)
parent.Children[dirName] = n parent.Children[dirName] = n
@ -490,8 +533,7 @@ func (s *store) Save() ([]byte, error) {
s.worldLock.Lock() s.worldLock.Lock()
clonedStore := newStore() clonedStore := newStore()
clonedStore.Index = s.Index clonedStore.CurrentIndex = s.CurrentIndex
clonedStore.Term = s.Term
clonedStore.Root = s.Root.Clone() clonedStore.Root = s.Root.Clone()
clonedStore.WatcherHub = s.WatcherHub.clone() clonedStore.WatcherHub = s.WatcherHub.clone()
clonedStore.Stats = s.Stats.clone() clonedStore.Stats = s.Stats.clone()
@ -521,6 +563,8 @@ func (s *store) Recovery(state []byte) error {
return err return err
} }
s.ttlKeyHeap = newTtlKeyHeap()
s.Root.recoverAndclean() s.Root.recoverAndclean()
return nil return nil
} }

View File

@ -27,8 +27,8 @@ import (
// Ensure that the store can retrieve an existing value. // Ensure that the store can retrieve an existing value.
func TestStoreGetValue(t *testing.T) { func TestStoreGetValue(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 2, 1) s.Create("/foo", "bar", false, Permanent)
e, err := s.Get("/foo", false, false, 2, 1) e, err := s.Get("/foo", false, false)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, e.Action, "get", "") assert.Equal(t, e.Action, "get", "")
assert.Equal(t, e.Key, "/foo", "") assert.Equal(t, e.Key, "/foo", "")
@ -39,14 +39,14 @@ func TestStoreGetValue(t *testing.T) {
// Note that hidden files should not be returned. // Note that hidden files should not be returned.
func TestStoreGetDirectory(t *testing.T) { func TestStoreGetDirectory(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "", false, Permanent, 2, 1) s.Create("/foo", "", false, Permanent)
s.Create("/foo/bar", "X", false, Permanent, 3, 1) s.Create("/foo/bar", "X", false, Permanent)
s.Create("/foo/_hidden", "*", false, Permanent, 4, 1) s.Create("/foo/_hidden", "*", false, Permanent)
s.Create("/foo/baz", "", false, Permanent, 5, 1) s.Create("/foo/baz", "", false, Permanent)
s.Create("/foo/baz/bat", "Y", false, Permanent, 6, 1) s.Create("/foo/baz/bat", "Y", false, Permanent)
s.Create("/foo/baz/_hidden", "*", false, Permanent, 7, 1) s.Create("/foo/baz/_hidden", "*", false, Permanent)
s.Create("/foo/baz/ttl", "Y", false, time.Now().Add(time.Second*3), 8, 1) s.Create("/foo/baz/ttl", "Y", false, time.Now().Add(time.Second*3))
e, err := s.Get("/foo", true, false, 8, 1) e, err := s.Get("/foo", true, false)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, e.Action, "get", "") assert.Equal(t, e.Action, "get", "")
assert.Equal(t, e.Key, "/foo", "") assert.Equal(t, e.Key, "/foo", "")
@ -69,13 +69,13 @@ func TestStoreGetDirectory(t *testing.T) {
// Ensure that the store can retrieve a directory in sorted order. // Ensure that the store can retrieve a directory in sorted order.
func TestStoreGetSorted(t *testing.T) { func TestStoreGetSorted(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "", false, Permanent, 2, 1) s.Create("/foo", "", false, Permanent)
s.Create("/foo/x", "0", false, Permanent, 3, 1) s.Create("/foo/x", "0", false, Permanent)
s.Create("/foo/z", "0", false, Permanent, 4, 1) s.Create("/foo/z", "0", false, Permanent)
s.Create("/foo/y", "", false, Permanent, 5, 1) s.Create("/foo/y", "", false, Permanent)
s.Create("/foo/y/a", "0", false, Permanent, 6, 1) s.Create("/foo/y/a", "0", false, Permanent)
s.Create("/foo/y/b", "0", false, Permanent, 7, 1) s.Create("/foo/y/b", "0", false, Permanent)
e, err := s.Get("/foo", true, true, 8, 1) e, err := s.Get("/foo", true, true)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, e.KVPairs[0].Key, "/foo/x", "") assert.Equal(t, e.KVPairs[0].Key, "/foo/x", "")
assert.Equal(t, e.KVPairs[1].Key, "/foo/y", "") assert.Equal(t, e.KVPairs[1].Key, "/foo/y", "")
@ -87,7 +87,7 @@ func TestStoreGetSorted(t *testing.T) {
// Ensure that the store can create a new key if it doesn't already exist. // Ensure that the store can create a new key if it doesn't already exist.
func TestStoreCreateValue(t *testing.T) { func TestStoreCreateValue(t *testing.T) {
s := newStore() s := newStore()
e, err := s.Create("/foo", "bar", false, Permanent, 2, 1) e, err := s.Create("/foo", "bar", false, Permanent)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, e.Action, "create", "") assert.Equal(t, e.Action, "create", "")
assert.Equal(t, e.Key, "/foo", "") assert.Equal(t, e.Key, "/foo", "")
@ -97,14 +97,13 @@ func TestStoreCreateValue(t *testing.T) {
assert.Nil(t, e.KVPairs, "") assert.Nil(t, e.KVPairs, "")
assert.Nil(t, e.Expiration, "") assert.Nil(t, e.Expiration, "")
assert.Equal(t, e.TTL, 0, "") assert.Equal(t, e.TTL, 0, "")
assert.Equal(t, e.Index, uint64(2), "") assert.Equal(t, e.ModifiedIndex, uint64(1), "")
assert.Equal(t, e.Term, uint64(1), "")
} }
// Ensure that the store can create a new directory if it doesn't already exist. // Ensure that the store can create a new directory if it doesn't already exist.
func TestStoreCreateDirectory(t *testing.T) { func TestStoreCreateDirectory(t *testing.T) {
s := newStore() s := newStore()
e, err := s.Create("/foo", "", false, Permanent, 2, 1) e, err := s.Create("/foo", "", false, Permanent)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, e.Action, "create", "") assert.Equal(t, e.Action, "create", "")
assert.Equal(t, e.Key, "/foo", "") assert.Equal(t, e.Key, "/foo", "")
@ -114,22 +113,21 @@ func TestStoreCreateDirectory(t *testing.T) {
// Ensure that the store fails to create a key if it already exists. // Ensure that the store fails to create a key if it already exists.
func TestStoreCreateFailsIfExists(t *testing.T) { func TestStoreCreateFailsIfExists(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "", false, Permanent, 2, 1) s.Create("/foo", "", false, Permanent)
e, _err := s.Create("/foo", "", false, Permanent, 3, 1) e, _err := s.Create("/foo", "", false, Permanent)
err := _err.(*etcdErr.Error) err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeNodeExist, "") assert.Equal(t, err.ErrorCode, etcdErr.EcodeNodeExist, "")
assert.Equal(t, err.Message, "Already exists", "") assert.Equal(t, err.Message, "Already exists", "")
assert.Equal(t, err.Cause, "/foo", "") assert.Equal(t, err.Cause, "/foo", "")
assert.Equal(t, err.Index, uint64(3), "") assert.Equal(t, err.Index, uint64(1), "")
assert.Equal(t, err.Term, uint64(1), "")
assert.Nil(t, e, 0, "") assert.Nil(t, e, 0, "")
} }
// Ensure that the store can update a key if it already exists. // Ensure that the store can update a key if it already exists.
func TestStoreUpdateValue(t *testing.T) { func TestStoreUpdateValue(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 2, 1) s.Create("/foo", "bar", false, Permanent)
e, err := s.Update("/foo", "baz", Permanent, 3, 1) e, err := s.Update("/foo", "baz", Permanent)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, e.Action, "update", "") assert.Equal(t, e.Action, "update", "")
assert.Equal(t, e.Key, "/foo", "") assert.Equal(t, e.Key, "/foo", "")
@ -137,17 +135,16 @@ func TestStoreUpdateValue(t *testing.T) {
assert.Equal(t, e.PrevValue, "bar", "") assert.Equal(t, e.PrevValue, "bar", "")
assert.Equal(t, e.Value, "baz", "") assert.Equal(t, e.Value, "baz", "")
assert.Equal(t, e.TTL, 0, "") assert.Equal(t, e.TTL, 0, "")
assert.Equal(t, e.Index, uint64(3), "") assert.Equal(t, e.ModifiedIndex, uint64(2), "")
assert.Equal(t, e.Term, uint64(1), "") e, _ = s.Get("/foo", false, false)
e, _ = s.Get("/foo", false, false, 3, 1)
assert.Equal(t, e.Value, "baz", "") assert.Equal(t, e.Value, "baz", "")
} }
// Ensure that the store cannot update a directory. // Ensure that the store cannot update a directory.
func TestStoreUpdateFailsIfDirectory(t *testing.T) { func TestStoreUpdateFailsIfDirectory(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "", false, Permanent, 2, 1) s.Create("/foo", "", false, Permanent)
e, _err := s.Update("/foo", "baz", Permanent, 3, 1) e, _err := s.Update("/foo", "baz", Permanent)
err := _err.(*etcdErr.Error) err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "") assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "")
assert.Equal(t, err.Message, "Not A File", "") assert.Equal(t, err.Message, "Not A File", "")
@ -158,13 +155,20 @@ func TestStoreUpdateFailsIfDirectory(t *testing.T) {
// Ensure that the store can update the TTL on a value. // Ensure that the store can update the TTL on a value.
func TestStoreUpdateValueTTL(t *testing.T) { func TestStoreUpdateValueTTL(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 2, 1)
_, err := s.Update("/foo", "baz", time.Now().Add(1*time.Millisecond), 3, 1) c := make(chan bool)
e, _ := s.Get("/foo", false, false, 3, 1) defer func() {
c <- true
}()
go mockSyncService(s.DeleteExpiredKeys, c)
s.Create("/foo", "bar", false, Permanent)
_, err := s.Update("/foo", "baz", time.Now().Add(500*time.Millisecond))
e, _ := s.Get("/foo", false, false)
assert.Equal(t, e.Value, "baz", "") assert.Equal(t, e.Value, "baz", "")
time.Sleep(2 * time.Millisecond) time.Sleep(600 * time.Millisecond)
e, err = s.Get("/foo", false, false, 3, 1) e, err = s.Get("/foo", false, false)
assert.Nil(t, e, "") assert.Nil(t, e, "")
assert.Equal(t, err.(*etcdErr.Error).ErrorCode, etcdErr.EcodeKeyNotFound, "") assert.Equal(t, err.(*etcdErr.Error).ErrorCode, etcdErr.EcodeKeyNotFound, "")
} }
@ -172,14 +176,21 @@ func TestStoreUpdateValueTTL(t *testing.T) {
// Ensure that the store can update the TTL on a directory. // Ensure that the store can update the TTL on a directory.
func TestStoreUpdateDirTTL(t *testing.T) { func TestStoreUpdateDirTTL(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "", false, Permanent, 2, 1)
s.Create("/foo/bar", "baz", false, Permanent, 3, 1) c := make(chan bool)
_, err := s.Update("/foo", "", time.Now().Add(1*time.Millisecond), 3, 1) defer func() {
e, _ := s.Get("/foo/bar", false, false, 3, 1) c <- true
}()
go mockSyncService(s.DeleteExpiredKeys, c)
s.Create("/foo", "", false, Permanent)
s.Create("/foo/bar", "baz", false, Permanent)
_, err := s.Update("/foo", "", time.Now().Add(500*time.Millisecond))
e, _ := s.Get("/foo/bar", false, false)
assert.Equal(t, e.Value, "baz", "") assert.Equal(t, e.Value, "baz", "")
time.Sleep(2 * time.Millisecond) time.Sleep(600 * time.Millisecond)
e, err = s.Get("/foo/bar", false, false, 3, 1) e, err = s.Get("/foo/bar", false, false)
assert.Nil(t, e, "") assert.Nil(t, e, "")
assert.Equal(t, err.(*etcdErr.Error).ErrorCode, etcdErr.EcodeKeyNotFound, "") assert.Equal(t, err.(*etcdErr.Error).ErrorCode, etcdErr.EcodeKeyNotFound, "")
} }
@ -187,8 +198,8 @@ func TestStoreUpdateDirTTL(t *testing.T) {
// Ensure that the store can delete a value. // Ensure that the store can delete a value.
func TestStoreDeleteValue(t *testing.T) { func TestStoreDeleteValue(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 2, 1) s.Create("/foo", "bar", false, Permanent)
e, err := s.Delete("/foo", false, 3, 1) e, err := s.Delete("/foo", false)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, e.Action, "delete", "") assert.Equal(t, e.Action, "delete", "")
} }
@ -196,8 +207,8 @@ func TestStoreDeleteValue(t *testing.T) {
// Ensure that the store can delete a directory if recursive is specified. // Ensure that the store can delete a directory if recursive is specified.
func TestStoreDeleteDiretory(t *testing.T) { func TestStoreDeleteDiretory(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "", false, Permanent, 2, 1) s.Create("/foo", "", false, Permanent)
e, err := s.Delete("/foo", true, 3, 1) e, err := s.Delete("/foo", true)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, e.Action, "delete", "") assert.Equal(t, e.Action, "delete", "")
} }
@ -205,8 +216,8 @@ func TestStoreDeleteDiretory(t *testing.T) {
// Ensure that the store cannot delete a directory if recursive is not specified. // Ensure that the store cannot delete a directory if recursive is not specified.
func TestStoreDeleteDiretoryFailsIfNonRecursive(t *testing.T) { func TestStoreDeleteDiretoryFailsIfNonRecursive(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "", false, Permanent, 2, 1) s.Create("/foo", "", false, Permanent)
e, _err := s.Delete("/foo", false, 3, 1) e, _err := s.Delete("/foo", false)
err := _err.(*etcdErr.Error) err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "") assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "")
assert.Equal(t, err.Message, "Not A File", "") assert.Equal(t, err.Message, "Not A File", "")
@ -216,60 +227,60 @@ func TestStoreDeleteDiretoryFailsIfNonRecursive(t *testing.T) {
// Ensure that the store can conditionally update a key if it has a previous value. // Ensure that the store can conditionally update a key if it has a previous value.
func TestStoreCompareAndSwapPrevValue(t *testing.T) { func TestStoreCompareAndSwapPrevValue(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 2, 1) s.Create("/foo", "bar", false, Permanent)
e, err := s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent, 3, 1) e, err := s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndSwap", "") assert.Equal(t, e.Action, "compareAndSwap", "")
assert.Equal(t, e.PrevValue, "bar", "") assert.Equal(t, e.PrevValue, "bar", "")
assert.Equal(t, e.Value, "baz", "") assert.Equal(t, e.Value, "baz", "")
e, _ = s.Get("/foo", false, false, 3, 1) e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "baz", "") assert.Equal(t, e.Value, "baz", "")
} }
// Ensure that the store cannot conditionally update a key if it has the wrong previous value. // Ensure that the store cannot conditionally update a key if it has the wrong previous value.
func TestStoreCompareAndSwapPrevValueFailsIfNotMatch(t *testing.T) { func TestStoreCompareAndSwapPrevValueFailsIfNotMatch(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 2, 1) s.Create("/foo", "bar", false, Permanent)
e, _err := s.CompareAndSwap("/foo", "wrong_value", 0, "baz", Permanent, 3, 1) e, _err := s.CompareAndSwap("/foo", "wrong_value", 0, "baz", Permanent)
err := _err.(*etcdErr.Error) err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "") assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
assert.Equal(t, err.Message, "Test Failed", "") assert.Equal(t, err.Message, "Test Failed", "")
assert.Nil(t, e, "") assert.Nil(t, e, "")
e, _ = s.Get("/foo", false, false, 3, 1) e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "bar", "") assert.Equal(t, e.Value, "bar", "")
} }
// Ensure that the store can conditionally update a key if it has a previous index. // Ensure that the store can conditionally update a key if it has a previous index.
func TestStoreCompareAndSwapPrevIndex(t *testing.T) { func TestStoreCompareAndSwapPrevIndex(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 2, 1) s.Create("/foo", "bar", false, Permanent)
e, err := s.CompareAndSwap("/foo", "", 2, "baz", Permanent, 3, 1) e, err := s.CompareAndSwap("/foo", "", 1, "baz", Permanent)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndSwap", "") assert.Equal(t, e.Action, "compareAndSwap", "")
assert.Equal(t, e.PrevValue, "bar", "") assert.Equal(t, e.PrevValue, "bar", "")
assert.Equal(t, e.Value, "baz", "") assert.Equal(t, e.Value, "baz", "")
e, _ = s.Get("/foo", false, false, 3, 1) e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "baz", "") assert.Equal(t, e.Value, "baz", "")
} }
// Ensure that the store cannot conditionally update a key if it has the wrong previous index. // Ensure that the store cannot conditionally update a key if it has the wrong previous index.
func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) { func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 2, 1) s.Create("/foo", "bar", false, Permanent)
e, _err := s.CompareAndSwap("/foo", "", 100, "baz", Permanent, 3, 1) e, _err := s.CompareAndSwap("/foo", "", 100, "baz", Permanent)
err := _err.(*etcdErr.Error) err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "") assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
assert.Equal(t, err.Message, "Test Failed", "") assert.Equal(t, err.Message, "Test Failed", "")
assert.Nil(t, e, "") assert.Nil(t, e, "")
e, _ = s.Get("/foo", false, false, 3, 1) e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "bar", "") assert.Equal(t, e.Value, "bar", "")
} }
// Ensure that the store can watch for key creation. // Ensure that the store can watch for key creation.
func TestStoreWatchCreate(t *testing.T) { func TestStoreWatchCreate(t *testing.T) {
s := newStore() s := newStore()
c, _ := s.Watch("/foo", false, 0, 0, 1) c, _ := s.Watch("/foo", false, 0)
s.Create("/foo", "bar", false, Permanent, 2, 1) s.Create("/foo", "bar", false, Permanent)
e := nbselect(c) e := nbselect(c)
assert.Equal(t, e.Action, "create", "") assert.Equal(t, e.Action, "create", "")
assert.Equal(t, e.Key, "/foo", "") assert.Equal(t, e.Key, "/foo", "")
@ -280,8 +291,8 @@ func TestStoreWatchCreate(t *testing.T) {
// Ensure that the store can watch for recursive key creation. // Ensure that the store can watch for recursive key creation.
func TestStoreWatchRecursiveCreate(t *testing.T) { func TestStoreWatchRecursiveCreate(t *testing.T) {
s := newStore() s := newStore()
c, _ := s.Watch("/foo", true, 0, 0, 1) c, _ := s.Watch("/foo", true, 0)
s.Create("/foo/bar", "baz", false, Permanent, 2, 1) s.Create("/foo/bar", "baz", false, Permanent)
e := nbselect(c) e := nbselect(c)
assert.Equal(t, e.Action, "create", "") assert.Equal(t, e.Action, "create", "")
assert.Equal(t, e.Key, "/foo/bar", "") assert.Equal(t, e.Key, "/foo/bar", "")
@ -290,9 +301,9 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
// Ensure that the store can watch for key updates. // Ensure that the store can watch for key updates.
func TestStoreWatchUpdate(t *testing.T) { func TestStoreWatchUpdate(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 2, 1) s.Create("/foo", "bar", false, Permanent)
c, _ := s.Watch("/foo", false, 0, 0, 1) c, _ := s.Watch("/foo", false, 0)
s.Update("/foo", "baz", Permanent, 3, 1) s.Update("/foo", "baz", Permanent)
e := nbselect(c) e := nbselect(c)
assert.Equal(t, e.Action, "update", "") assert.Equal(t, e.Action, "update", "")
assert.Equal(t, e.Key, "/foo", "") assert.Equal(t, e.Key, "/foo", "")
@ -301,9 +312,9 @@ func TestStoreWatchUpdate(t *testing.T) {
// Ensure that the store can watch for recursive key updates. // Ensure that the store can watch for recursive key updates.
func TestStoreWatchRecursiveUpdate(t *testing.T) { func TestStoreWatchRecursiveUpdate(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo/bar", "baz", false, Permanent, 2, 1) s.Create("/foo/bar", "baz", false, Permanent)
c, _ := s.Watch("/foo", true, 0, 0, 1) c, _ := s.Watch("/foo", true, 0)
s.Update("/foo/bar", "baz", Permanent, 3, 1) s.Update("/foo/bar", "baz", Permanent)
e := nbselect(c) e := nbselect(c)
assert.Equal(t, e.Action, "update", "") assert.Equal(t, e.Action, "update", "")
assert.Equal(t, e.Key, "/foo/bar", "") assert.Equal(t, e.Key, "/foo/bar", "")
@ -312,9 +323,9 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
// Ensure that the store can watch for key deletions. // Ensure that the store can watch for key deletions.
func TestStoreWatchDelete(t *testing.T) { func TestStoreWatchDelete(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 2, 1) s.Create("/foo", "bar", false, Permanent)
c, _ := s.Watch("/foo", false, 0, 0, 1) c, _ := s.Watch("/foo", false, 0)
s.Delete("/foo", false, 3, 1) s.Delete("/foo", false)
e := nbselect(c) e := nbselect(c)
assert.Equal(t, e.Action, "delete", "") assert.Equal(t, e.Action, "delete", "")
assert.Equal(t, e.Key, "/foo", "") assert.Equal(t, e.Key, "/foo", "")
@ -323,9 +334,9 @@ func TestStoreWatchDelete(t *testing.T) {
// Ensure that the store can watch for recursive key deletions. // Ensure that the store can watch for recursive key deletions.
func TestStoreWatchRecursiveDelete(t *testing.T) { func TestStoreWatchRecursiveDelete(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo/bar", "baz", false, Permanent, 2, 1) s.Create("/foo/bar", "baz", false, Permanent)
c, _ := s.Watch("/foo", true, 0, 0, 1) c, _ := s.Watch("/foo", true, 0)
s.Delete("/foo/bar", false, 3, 1) s.Delete("/foo/bar", false)
e := nbselect(c) e := nbselect(c)
assert.Equal(t, e.Action, "delete", "") assert.Equal(t, e.Action, "delete", "")
assert.Equal(t, e.Key, "/foo/bar", "") assert.Equal(t, e.Key, "/foo/bar", "")
@ -334,9 +345,9 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
// Ensure that the store can watch for CAS updates. // Ensure that the store can watch for CAS updates.
func TestStoreWatchCompareAndSwap(t *testing.T) { func TestStoreWatchCompareAndSwap(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent, 2, 1) s.Create("/foo", "bar", false, Permanent)
c, _ := s.Watch("/foo", false, 0, 0, 1) c, _ := s.Watch("/foo", false, 0)
s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent, 3, 1) s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
e := nbselect(c) e := nbselect(c)
assert.Equal(t, e.Action, "compareAndSwap", "") assert.Equal(t, e.Action, "compareAndSwap", "")
assert.Equal(t, e.Key, "/foo", "") assert.Equal(t, e.Key, "/foo", "")
@ -345,9 +356,9 @@ func TestStoreWatchCompareAndSwap(t *testing.T) {
// Ensure that the store can watch for recursive CAS updates. // Ensure that the store can watch for recursive CAS updates.
func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) { func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo/bar", "baz", false, Permanent, 2, 1) s.Create("/foo/bar", "baz", false, Permanent)
c, _ := s.Watch("/foo", true, 0, 0, 1) c, _ := s.Watch("/foo", true, 0)
s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent, 3, 1) s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
e := nbselect(c) e := nbselect(c)
assert.Equal(t, e.Action, "compareAndSwap", "") assert.Equal(t, e.Action, "compareAndSwap", "")
assert.Equal(t, e.Key, "/foo/bar", "") assert.Equal(t, e.Key, "/foo/bar", "")
@ -356,32 +367,45 @@ func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
// Ensure that the store can watch for key expiration. // Ensure that the store can watch for key expiration.
func TestStoreWatchExpire(t *testing.T) { func TestStoreWatchExpire(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, time.Now().Add(1*time.Millisecond), 2, 1)
c, _ := s.Watch("/foo", false, 0, 0, 1) stopChan := make(chan bool)
defer func() {
stopChan <- true
}()
go mockSyncService(s.DeleteExpiredKeys, stopChan)
s.Create("/foo", "bar", false, time.Now().Add(500*time.Millisecond))
s.Create("/foofoo", "barbarbar", false, time.Now().Add(500*time.Millisecond))
c, _ := s.Watch("/", true, 0)
e := nbselect(c) e := nbselect(c)
assert.Nil(t, e, "") assert.Nil(t, e, "")
time.Sleep(2 * time.Millisecond) time.Sleep(600 * time.Millisecond)
e = nbselect(c) e = nbselect(c)
assert.Equal(t, e.Action, "expire", "") assert.Equal(t, e.Action, "expire", "")
assert.Equal(t, e.Key, "/foo", "") assert.Equal(t, e.Key, "/foo", "")
c, _ = s.Watch("/", true, 4)
e = nbselect(c)
assert.Equal(t, e.Action, "expire", "")
assert.Equal(t, e.Key, "/foofoo", "")
} }
// Ensure that the store can recover from a previously saved state. // Ensure that the store can recover from a previously saved state.
func TestStoreRecover(t *testing.T) { func TestStoreRecover(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "", false, Permanent, 2, 1) s.Create("/foo", "", false, Permanent)
s.Create("/foo/x", "bar", false, Permanent, 3, 1) s.Create("/foo/x", "bar", false, Permanent)
s.Create("/foo/y", "baz", false, Permanent, 4, 1) s.Create("/foo/y", "baz", false, Permanent)
b, err := s.Save() b, err := s.Save()
s2 := newStore() s2 := newStore()
s2.Recovery(b) s2.Recovery(b)
e, err := s.Get("/foo/x", false, false, 4, 1) e, err := s.Get("/foo/x", false, false)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, e.Value, "bar", "") assert.Equal(t, e.Value, "bar", "")
e, err = s.Get("/foo/y", false, false, 4, 1) e, err = s.Get("/foo/y", false, false)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, e.Value, "baz", "") assert.Equal(t, e.Value, "baz", "")
} }
@ -389,21 +413,37 @@ func TestStoreRecover(t *testing.T) {
// Ensure that the store can recover from a previously saved state that includes an expiring key. // Ensure that the store can recover from a previously saved state that includes an expiring key.
func TestStoreRecoverWithExpiration(t *testing.T) { func TestStoreRecoverWithExpiration(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "", false, Permanent, 2, 1)
s.Create("/foo/x", "bar", false, Permanent, 3, 1) c := make(chan bool)
s.Create("/foo/y", "baz", false, time.Now().Add(5*time.Millisecond), 4, 1) defer func() {
c <- true
}()
go mockSyncService(s.DeleteExpiredKeys, c)
s.Create("/foo", "", false, Permanent)
s.Create("/foo/x", "bar", false, Permanent)
s.Create("/foo/y", "baz", false, time.Now().Add(5*time.Millisecond))
b, err := s.Save() b, err := s.Save()
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
s2 := newStore() s2 := newStore()
c2 := make(chan bool)
defer func() {
c2 <- true
}()
go mockSyncService(s2.DeleteExpiredKeys, c2)
s2.Recovery(b) s2.Recovery(b)
e, err := s.Get("/foo/x", false, false, 4, 1) time.Sleep(600 * time.Millisecond)
e, err := s.Get("/foo/x", false, false)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, e.Value, "bar", "") assert.Equal(t, e.Value, "bar", "")
e, err = s.Get("/foo/y", false, false, 4, 1) e, err = s.Get("/foo/y", false, false)
assert.NotNil(t, err, "") assert.NotNil(t, err, "")
assert.Nil(t, e, "") assert.Nil(t, e, "")
} }
@ -417,3 +457,15 @@ func nbselect(c <-chan *Event) *Event {
return nil return nil
} }
} }
func mockSyncService(f func(now time.Time), c chan bool) {
ticker := time.Tick(time.Millisecond * 500)
for {
select {
case <-c:
return
case now := <-ticker:
f(now)
}
}
}

81
store/ttl_key_heap.go Normal file
View File

@ -0,0 +1,81 @@
package store
import (
"container/heap"
)
// An TTLKeyHeap is a min-heap of TTLKeys order by expiration time
type ttlKeyHeap struct {
array []*Node
keyMap map[*Node]int
}
func newTtlKeyHeap() *ttlKeyHeap {
h := &ttlKeyHeap{keyMap: make(map[*Node]int)}
heap.Init(h)
return h
}
func (h ttlKeyHeap) Len() int {
return len(h.array)
}
func (h ttlKeyHeap) Less(i, j int) bool {
return h.array[i].ExpireTime.Before(h.array[j].ExpireTime)
}
func (h ttlKeyHeap) Swap(i, j int) {
// swap node
h.array[i], h.array[j] = h.array[j], h.array[i]
// update map
h.keyMap[h.array[i]] = i
h.keyMap[h.array[j]] = j
}
func (h *ttlKeyHeap) Push(x interface{}) {
n, _ := x.(*Node)
h.keyMap[n] = len(h.array)
h.array = append(h.array, n)
}
func (h *ttlKeyHeap) Pop() interface{} {
old := h.array
n := len(old)
x := old[n-1]
h.array = old[0 : n-1]
delete(h.keyMap, x)
return x
}
func (h *ttlKeyHeap) top() *Node {
if h.Len() != 0 {
return h.array[0]
}
return nil
}
func (h *ttlKeyHeap) pop() *Node {
x := heap.Pop(h)
n, _ := x.(*Node)
return n
}
func (h *ttlKeyHeap) push(x interface{}) {
heap.Push(h, x)
}
func (h *ttlKeyHeap) update(n *Node) {
index, ok := h.keyMap[n]
if ok {
heap.Remove(h, index)
heap.Push(h, n)
}
}
func (h *ttlKeyHeap) remove(n *Node) {
index, ok := h.keyMap[n]
if ok {
heap.Remove(h, index)
}
}

View File

@ -71,3 +71,9 @@ func (f *CommandFactory) CreateCompareAndSwapCommand(key string, value string, p
ExpireTime: expireTime, ExpireTime: expireTime,
} }
} }
func (f *CommandFactory) CreateSyncCommand(now time.Time) raft.Command {
return &SyncCommand{
Time: time.Now(),
}
}

View File

@ -30,8 +30,7 @@ func (c *CompareAndSwapCommand) CommandName() string {
func (c *CompareAndSwapCommand) Apply(server raft.Server) (interface{}, error) { func (c *CompareAndSwapCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(store.Store) s, _ := server.StateMachine().(store.Store)
e, err := s.CompareAndSwap(c.Key, c.PrevValue, c.PrevIndex, e, err := s.CompareAndSwap(c.Key, c.PrevValue, c.PrevIndex, c.Value, c.ExpireTime)
c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
if err != nil { if err != nil {
log.Debug(err) log.Debug(err)

View File

@ -29,7 +29,7 @@ func (c *CreateCommand) CommandName() string {
func (c *CreateCommand) Apply(server raft.Server) (interface{}, error) { func (c *CreateCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(store.Store) s, _ := server.StateMachine().(store.Store)
e, err := s.Create(c.Key, c.Value, c.Unique, c.ExpireTime, server.CommitIndex(), server.Term()) e, err := s.Create(c.Key, c.Value, c.Unique, c.ExpireTime)
if err != nil { if err != nil {
log.Debug(err) log.Debug(err)

View File

@ -1,8 +1,8 @@
package v2 package v2
import ( import (
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/log" "github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft" "github.com/coreos/go-raft"
) )
@ -25,7 +25,7 @@ func (c *DeleteCommand) CommandName() string {
func (c *DeleteCommand) Apply(server raft.Server) (interface{}, error) { func (c *DeleteCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(store.Store) s, _ := server.StateMachine().(store.Store)
e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term()) e, err := s.Delete(c.Key, c.Recursive)
if err != nil { if err != nil {
log.Debug(err) log.Debug(err)

View File

@ -29,7 +29,7 @@ func (c *SetCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(store.Store) s, _ := server.StateMachine().(store.Store)
// create a new node or replace the old node. // create a new node or replace the old node.
e, err := s.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) e, err := s.Set(c.Key, c.Value, c.ExpireTime)
if err != nil { if err != nil {
log.Debug(err) log.Debug(err)

28
store/v2/sync_command.go Normal file
View File

@ -0,0 +1,28 @@
package v2
import (
"time"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
)
func init() {
raft.RegisterCommand(&SyncCommand{})
}
type SyncCommand struct {
Time time.Time `json:"time"`
}
// The name of the Sync command in the log
func (c SyncCommand) CommandName() string {
return "etcd:sync"
}
func (c SyncCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(store.Store)
s.DeleteExpiredKeys(c.Time)
return nil, nil
}

View File

@ -27,7 +27,7 @@ func (c *UpdateCommand) CommandName() string {
func (c *UpdateCommand) Apply(server raft.Server) (interface{}, error) { func (c *UpdateCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(store.Store) s, _ := server.StateMachine().(store.Store)
e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) e, err := s.Update(c.Key, c.Value, c.ExpireTime)
if err != nil { if err != nil {
log.Debug(err) log.Debug(err)

View File

@ -40,8 +40,7 @@ func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {
// at the file we need to delete. // at the file we need to delete.
// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher // For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
// should get notified even if "/foo" is not the path it is watching. // should get notified even if "/foo" is not the path it is watching.
if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
if (w.recursive || originalPath || deleted) && e.Index >= w.sinceIndex {
w.eventChan <- e w.eventChan <- e
return true return true
} }

View File

@ -37,23 +37,24 @@ func newWatchHub(capacity int) *watcherHub {
// If recursive is false, the first change after index at prefix will be sent to the event channel. // If recursive is false, the first change after index at prefix will be sent to the event channel.
// If index is zero, watch will start from the current index + 1. // If index is zero, watch will start from the current index + 1.
func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) { func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) {
eventChan := make(chan *Event, 1) event, err := wh.EventHistory.scan(prefix, index)
e, err := wh.EventHistory.scan(prefix, index)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if e != nil { eventChan := make(chan *Event, 1) // use a buffered channel
eventChan <- e
if event != nil {
eventChan <- event
return eventChan, nil return eventChan, nil
} }
w := &watcher{ w := &watcher{
eventChan: eventChan, eventChan: eventChan,
recursive: recursive, recursive: recursive,
sinceIndex: index - 1, // to catch Expire() sinceIndex: index,
} }
l, ok := wh.watchers[prefix] l, ok := wh.watchers[prefix]
@ -93,19 +94,16 @@ func (wh *watcherHub) notify(e *Event) {
func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
l, ok := wh.watchers[path] l, ok := wh.watchers[path]
if ok { if ok {
curr := l.Front() curr := l.Front()
notifiedAll := true
for { for {
if curr == nil { // we have reached the end of the list if curr == nil { // we have reached the end of the list
if notifiedAll { if l.Len() == 0 {
// if we have notified all watcher in the list // if we have notified all watcher in the list
// we can delete the list // we can delete the list
delete(wh.watchers, path) delete(wh.watchers, path)
} }
break break
} }
@ -114,16 +112,13 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
w, _ := curr.Value.(*watcher) w, _ := curr.Value.(*watcher)
if w.notify(e, e.Key == path, deleted) { if w.notify(e, e.Key == path, deleted) {
// if we successfully notify a watcher // if we successfully notify a watcher
// we need to remove the watcher from the list // we need to remove the watcher from the list
// and decrease the counter // and decrease the counter
l.Remove(curr) l.Remove(curr)
atomic.AddInt64(&wh.count, -1) atomic.AddInt64(&wh.count, -1)
} else {
// once there is a watcher in the list is not interested
// in the event, we should keep the list in the map
notifiedAll = false
} }
curr = next // update current to the next curr = next // update current to the next

View File

@ -35,7 +35,7 @@ func TestWatcher(t *testing.T) {
// do nothing // do nothing
} }
e := newEvent(Create, "/foo/bar", 1, 1) e := newEvent(Create, "/foo/bar", 1)
wh.notify(e) wh.notify(e)
@ -47,7 +47,7 @@ func TestWatcher(t *testing.T) {
c, _ = wh.watch("/foo", false, 2) c, _ = wh.watch("/foo", false, 2)
e = newEvent(Create, "/foo/bar", 2, 1) e = newEvent(Create, "/foo/bar", 2)
wh.notify(e) wh.notify(e)
@ -58,7 +58,7 @@ func TestWatcher(t *testing.T) {
// do nothing // do nothing
} }
e = newEvent(Create, "/foo", 3, 1) e = newEvent(Create, "/foo", 3)
wh.notify(e) wh.notify(e)

View File

@ -65,8 +65,7 @@ func TestMultiNodeKillAllAndRecovery(t *testing.T) {
t.Fatalf("Recovery error: %s", err) t.Fatalf("Recovery error: %s", err)
} }
if result.Index != 18 { if result.Index != 16 {
t.Fatalf("recovery failed! [%d/18]", result.Index) t.Fatalf("recovery failed! [%d/16]", result.Index)
} }
} }

View File

@ -1,6 +1,7 @@
package test package test
import ( import (
"fmt"
"net/http" "net/http"
"os" "os"
"testing" "testing"
@ -31,6 +32,7 @@ func TestRemoveNode(t *testing.T) {
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
client.Do(rmReq) client.Do(rmReq)
fmt.Println("send remove to node3 and wait for its exiting")
etcds[2].Wait() etcds[2].Wait()
resp, err := c.Get("_etcd/machines") resp, err := c.Get("_etcd/machines")
@ -71,6 +73,7 @@ func TestRemoveNode(t *testing.T) {
// first kill the node, then remove it, then add it back // first kill the node, then remove it, then add it back
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
etcds[2].Kill() etcds[2].Kill()
fmt.Println("kill node3 and wait for its exiting")
etcds[2].Wait() etcds[2].Wait()
client.Do(rmReq) client.Do(rmReq)

View File

@ -3,6 +3,7 @@ package test
import ( import (
"io/ioutil" "io/ioutil"
"os" "os"
"strconv"
"testing" "testing"
"time" "time"
@ -52,8 +53,10 @@ func TestSimpleSnapshot(t *testing.T) {
t.Fatal("wrong number of snapshot :[1/", len(snapshots), "]") t.Fatal("wrong number of snapshot :[1/", len(snapshots), "]")
} }
if snapshots[0].Name() != "0_503.ss" { index, _ := strconv.Atoi(snapshots[0].Name()[2:5])
t.Fatal("wrong name of snapshot :[0_503.ss/", snapshots[0].Name(), "]")
if index < 507 || index > 510 {
t.Fatal("wrong name of snapshot :", snapshots[0].Name())
} }
// issue second 501 commands // issue second 501 commands
@ -82,7 +85,9 @@ func TestSimpleSnapshot(t *testing.T) {
t.Fatal("wrong number of snapshot :[1/", len(snapshots), "]") t.Fatal("wrong number of snapshot :[1/", len(snapshots), "]")
} }
if snapshots[0].Name() != "0_1004.ss" { index, _ = strconv.Atoi(snapshots[0].Name()[2:6])
t.Fatal("wrong name of snapshot :[0_1004.ss/", snapshots[0].Name(), "]")
if index < 1015 || index > 1018 {
t.Fatal("wrong name of snapshot :", snapshots[0].Name())
} }
} }