From 811d172a545c0d6d8f3b0e07068d2435523739f4 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 14 Oct 2013 21:22:20 -0700 Subject: [PATCH 01/12] fix change wait_index to waitIndex; we do not use post in 0.2 --- server/v2/get_key_handler.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/v2/get_key_handler.go b/server/v2/get_key_handler.go index e6cce6e8b..a7f8e56a1 100644 --- a/server/v2/get_key_handler.go +++ b/server/v2/get_key_handler.go @@ -25,7 +25,7 @@ func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { leader := s.Leader() hostname, _ := s.PeerURL(leader) url := hostname + req.URL.Path - log.Debugf("Redirect to %s", url) + log.Debugf("Redirect consistent get to %s", url) http.Redirect(w, req, url, http.StatusTemporaryRedirect) return nil } @@ -36,8 +36,10 @@ func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { if req.FormValue("wait") == "true" { // watch // Create a command to watch from a given index (default 0). var sinceIndex uint64 = 0 - if req.Method == "POST" { - sinceIndex, err = strconv.ParseUint(string(req.FormValue("wait_index")), 10, 64) + + waitIndex := req.FormValue("waitIndex") + if waitIndex != "" { + sinceIndex, err = strconv.ParseUint(string(req.FormValue("waitIndex")), 10, 64) if err != nil { return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm) } From 545f8ed6a183c48e156a0084ed2951a3c3ae34a1 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 14 Oct 2013 22:22:23 -0700 Subject: [PATCH 02/12] fix update PUT handler --- server/server.go | 3 +- server/v1/set_key_handler.go | 2 +- server/v2/update_key_handler.go | 59 ++++++++---- ...command.go => compare_and_swap_command.go} | 14 +-- store/stats_test.go | 4 +- store/store.go | 89 +++++++++---------- store/store_test.go | 24 ++--- store/update_command.go | 38 -------- 8 files changed, 109 insertions(+), 124 deletions(-) rename store/{test_and_set_command.go => compare_and_swap_command.go} (61%) delete mode 100644 store/update_command.go diff --git a/server/server.go b/server/server.go index d1b1abf0f..0aeb5dbc4 100644 --- a/server/server.go +++ b/server/server.go @@ -254,10 +254,11 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro for i := 0; i < count; i++ { go func() { for j := 0; j < 10; j++ { - c := &store.UpdateCommand{ + c := &store.CreateCommand{ Key: "foo", Value: "bar", ExpireTime: time.Unix(0, 0), + Force: true, } s.peerServer.RaftServer().Do(c) } diff --git a/server/v1/set_key_handler.go b/server/v1/set_key_handler.go index 03b6d7f9b..acd4037c1 100644 --- a/server/v1/set_key_handler.go +++ b/server/v1/set_key_handler.go @@ -31,7 +31,7 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { // If the "prevValue" is specified then test-and-set. Otherwise create a new key. var c raft.Command if prevValueArr, ok := req.Form["prevValue"]; ok && len(prevValueArr) > 0 { - c = &store.TestAndSetCommand{ + c = &store.CompareAndSwapCommand{ Key: key, Value: value, PrevValue: prevValueArr[0], diff --git a/server/v2/update_key_handler.go b/server/v2/update_key_handler.go index 841c8828d..60c260afc 100644 --- a/server/v2/update_key_handler.go +++ b/server/v2/update_key_handler.go @@ -29,35 +29,58 @@ func UpdateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error prevValue, valueOk := req.Form["prevValue"] prevIndexStr, indexOk := req.Form["prevIndex"] + prevExist, existOk := req.Form["prevExist"] var c raft.Command - if !valueOk && !indexOk { // update without test - c = &store.UpdateCommand{ + + // Set command: create a new node or replace the old one. + if !valueOk && !indexOk && !existOk { + c = &store.CreateCommand{ Key: key, Value: value, ExpireTime: expireTime, + Force: true, } + return s.Dispatch(c, w, req) + } - } else { // update with test - var prevIndex uint64 - - if indexOk { - prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64) - - // bad previous index - if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm) + // update with test + if existOk { + if prevExist[0] == "false" { + // Create command: create a new node. Fail, if a node already exists + // Ignore prevIndex and prevValue + c = &store.CreateCommand{ + Key: key, + Value: value, + ExpireTime: expireTime, } - } else { - prevIndex = 0 } + } - c = &store.TestAndSetCommand{ - Key: key, - Value: value, - PrevValue: prevValue[0], - PrevIndex: prevIndex, + var prevIndex uint64 + + if indexOk { + prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64) + + // bad previous index + if err != nil { + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", store.UndefIndex, store.UndefTerm) } + } else { + prevIndex = 0 + } + + if valueOk { + if prevValue[0] == "" { + return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", store.UndefIndex, store.UndefTerm) + } + } + + c = &store.CompareAndSwapCommand{ + Key: key, + Value: value, + PrevValue: prevValue[0], + PrevIndex: prevIndex, } return s.Dispatch(c, w, req) diff --git a/store/test_and_set_command.go b/store/compare_and_swap_command.go similarity index 61% rename from store/test_and_set_command.go rename to store/compare_and_swap_command.go index 03cb1879a..c4bfee569 100644 --- a/store/test_and_set_command.go +++ b/store/compare_and_swap_command.go @@ -8,11 +8,11 @@ import ( ) func init() { - raft.RegisterCommand(&TestAndSetCommand{}) + raft.RegisterCommand(&CompareAndSwapCommand{}) } -// The TestAndSetCommand performs a conditional update on a key in the store. -type TestAndSetCommand struct { +// The CompareAndSwap performs a conditional update on a key in the store. +type CompareAndSwapCommand struct { Key string `json:"key"` Value string `json:"value"` ExpireTime time.Time `json:"expireTime"` @@ -21,15 +21,15 @@ type TestAndSetCommand struct { } // The name of the testAndSet command in the log -func (c *TestAndSetCommand) CommandName() string { - return "etcd:testAndSet" +func (c *CompareAndSwapCommand) CommandName() string { + return "etcd:compareAndSwap" } // Set the key-value pair if the current value of the key equals to the given prevValue -func (c *TestAndSetCommand) Apply(server raft.Server) (interface{}, error) { +func (c *CompareAndSwapCommand) Apply(server raft.Server) (interface{}, error) { s, _ := server.StateMachine().(Store) - e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex, + e, err := s.CompareAndSwap(c.Key, c.PrevValue, c.PrevIndex, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) if err != nil { diff --git a/store/stats_test.go b/store/stats_test.go index 44d1a8999..fa6483ac2 100644 --- a/store/stats_test.go +++ b/store/stats_test.go @@ -37,7 +37,7 @@ func TestBasicStats(t *testing.T) { for _, k := range keys { i++ - _, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) + _, err := s.update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) if err != nil { UpdateFail++ } else { @@ -58,7 +58,7 @@ func TestBasicStats(t *testing.T) { for _, k := range keys { i++ - _, err := s.TestAndSet(k, "foo", 0, "bar", Permanent, i, 1) + _, err := s.CompareAndSwap(k, "foo", 0, "bar", Permanent, i, 1) if err != nil { TestAndSetFail++ } else { diff --git a/store/store.go b/store/store.go index d063aa6ba..f25b4345e 100644 --- a/store/store.go +++ b/store/store.go @@ -17,8 +17,7 @@ type Store interface { Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) Create(nodePath string, value string, incrementalSuffix bool, force bool, expireTime time.Time, index uint64, term uint64) (*Event, error) - Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) - TestAndSet(nodePath string, prevValue string, prevIndex uint64, + CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) @@ -116,47 +115,7 @@ func (s *store) Create(nodePath string, value string, incrementalSuffix bool, fo return s.internalCreate(nodePath, value, incrementalSuffix, force, expireTime, index, term, Create) } -// Update function updates the value/ttl of the node. -// If the node is a file, the value and the ttl can be updated. -// If the node is a directory, only the ttl can be updated. -func (s *store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) { - s.worldLock.Lock() - defer s.worldLock.Unlock() - nodePath = path.Clean(path.Join("/", nodePath)) - - n, err := s.internalGet(nodePath, index, term) - - if err != nil { // if the node does not exist, return error - s.Stats.Inc(UpdateFail) - return nil, err - } - - e := newEvent(Update, nodePath, s.Index, s.Term) - - if len(newValue) != 0 { - if n.IsDir() { - // if the node is a directory, we cannot update value - s.Stats.Inc(UpdateFail) - return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term) - } - - e.PrevValue = n.Value - n.Write(newValue, index, term) - } - - // update ttl - n.UpdateTTL(expireTime) - - e.Expiration, e.TTL = n.ExpirationAndTTL() - - s.WatcherHub.notify(e) - - s.Stats.Inc(UpdateSuccess) - - return e, nil -} - -func (s *store) TestAndSet(nodePath string, prevValue string, prevIndex uint64, +func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { nodePath = path.Clean(path.Join("/", nodePath)) @@ -164,8 +123,8 @@ func (s *store) TestAndSet(nodePath string, prevValue string, prevIndex uint64, s.worldLock.Lock() defer s.worldLock.Unlock() - if prevValue == "" && prevIndex == 0 { // try create - return s.internalCreate(nodePath, value, false, false, expireTime, index, term, TestAndSet) + if prevValue == "" && prevIndex == 0 { // try just update + return s.update(nodePath, value, expireTime, index, term) } n, err := s.internalGet(nodePath, index, term) @@ -293,6 +252,46 @@ func (s *store) walk(nodePath string, walkFunc func(prev *Node, component string return curr, nil } +// Update function updates the value/ttl of the node. +// If the node is a file, the value and the ttl can be updated. +// If the node is a directory, only the ttl can be updated. +func (s *store) update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) { + s.worldLock.Lock() + defer s.worldLock.Unlock() + nodePath = path.Clean(path.Join("/", nodePath)) + + n, err := s.internalGet(nodePath, index, term) + + if err != nil { // if the node does not exist, return error + s.Stats.Inc(UpdateFail) + return nil, err + } + + e := newEvent(Update, nodePath, s.Index, s.Term) + + if len(newValue) != 0 { + if n.IsDir() { + // if the node is a directory, we cannot update value + s.Stats.Inc(UpdateFail) + return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term) + } + + e.PrevValue = n.Value + n.Write(newValue, index, term) + } + + // update ttl + n.UpdateTTL(expireTime) + + e.Expiration, e.TTL = n.ExpirationAndTTL() + + s.WatcherHub.notify(e) + + s.Stats.Inc(UpdateSuccess) + + return e, nil +} + func (s *store) internalCreate(nodePath string, value string, incrementalSuffix bool, force bool, expireTime time.Time, index uint64, term uint64, action string) (*Event, error) { diff --git a/store/store_test.go b/store/store_test.go index 958d99007..457a9b5f5 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -74,7 +74,7 @@ func TestUpdateFile(t *testing.T) { t.Fatalf("cannot create %s=bar [%s]", "/foo/bar", err.Error()) } - _, err = s.Update("/foo/bar", "barbar", Permanent, 2, 1) + _, err = s.update("/foo/bar", "barbar", Permanent, 2, 1) if err != nil { t.Fatalf("cannot update %s=barbar [%s]", "/foo/bar", err.Error()) @@ -114,7 +114,7 @@ func TestUpdateFile(t *testing.T) { } expire := time.Now().Add(time.Second * 2) - _, err = s.Update("/foo/foo", "", expire, 7, 1) + _, err = s.update("/foo/foo", "", expire, 7, 1) if err != nil { t.Fatalf("cannot update dir [%s] [%s]", "/foo/foo", err.Error()) } @@ -286,18 +286,18 @@ func TestExpire(t *testing.T) { } } -func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? +func TestCompareAndSwap(t *testing.T) { // TODO prevValue == nil ? s := newStore() s.Create("/foo", "bar", false, false, Permanent, 1, 1) // test on wrong previous value - _, err := s.TestAndSet("/foo", "barbar", 0, "car", Permanent, 2, 1) + _, err := s.CompareAndSwap("/foo", "barbar", 0, "car", Permanent, 2, 1) if err == nil { t.Fatal("test and set should fail barbar != bar") } // test on value - e, err := s.TestAndSet("/foo", "bar", 0, "car", Permanent, 3, 1) + e, err := s.CompareAndSwap("/foo", "bar", 0, "car", Permanent, 3, 1) if err != nil { t.Fatal("test and set should succeed bar == bar") @@ -308,7 +308,7 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? } // test on index - e, err = s.TestAndSet("/foo", "", 3, "bar", Permanent, 4, 1) + e, err = s.CompareAndSwap("/foo", "", 3, "bar", Permanent, 4, 1) if err != nil { t.Fatal("test and set should succeed index 3 == 3") @@ -331,14 +331,14 @@ func TestWatch(t *testing.T) { } c, _ = s.Watch("/foo/foo/foo", false, 0, 1, 1) - s.Update("/foo/foo/foo", "car", Permanent, 2, 1) + s.update("/foo/foo/foo", "car", Permanent, 2, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/foo" || e.Action != Update { t.Fatal("watch for Update node fails ", e) } c, _ = s.Watch("/foo/foo/foo", false, 0, 2, 1) - s.TestAndSet("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1) + s.CompareAndSwap("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/foo" || e.Action != TestAndSet { t.Fatal("watch for TestAndSet node fails") @@ -360,14 +360,14 @@ func TestWatch(t *testing.T) { } c, _ = s.Watch("/foo", true, 0, 5, 1) - s.Update("/foo/foo/boo", "foo", Permanent, 6, 1) + s.update("/foo/foo/boo", "foo", Permanent, 6, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/boo" || e.Action != Update { t.Fatal("watch for Update subdirectory fails") } c, _ = s.Watch("/foo", true, 0, 6, 1) - s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1) + s.CompareAndSwap("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/boo" || e.Action != TestAndSet { t.Fatal("watch for TestAndSet subdirectory fails") @@ -390,7 +390,7 @@ func TestWatch(t *testing.T) { } s.Create("/foo/foo/boo", "foo", false, false, Permanent, 10, 1) - s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1) + s.update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1) c, _ = s.Watch("/foo", true, 0, 11, 1) time.Sleep(time.Second * 2) e = nonblockingRetrive(c) @@ -399,7 +399,7 @@ func TestWatch(t *testing.T) { } s.Create("/foo/foo/boo", "foo", false, false, Permanent, 12, 1) - s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1) + s.CompareAndSwap("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1) c, _ = s.Watch("/foo", true, 0, 13, 1) time.Sleep(time.Second * 2) e = nonblockingRetrive(c) diff --git a/store/update_command.go b/store/update_command.go deleted file mode 100644 index 3152006bc..000000000 --- a/store/update_command.go +++ /dev/null @@ -1,38 +0,0 @@ -package store - -import ( - "time" - - "github.com/coreos/etcd/log" - "github.com/coreos/go-raft" -) - -func init() { - raft.RegisterCommand(&UpdateCommand{}) -} - -// The UpdateCommand updates the value of a key in the Store. -type UpdateCommand struct { - Key string `json:"key"` - Value string `json:"value"` - ExpireTime time.Time `json:"expireTime"` -} - -// The name of the update command in the log -func (c *UpdateCommand) CommandName() string { - return "etcd:update" -} - -// Update node -func (c *UpdateCommand) Apply(server raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(Store) - - e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) - - if err != nil { - log.Debug(err) - return nil, err - } - - return e, nil -} From 9ebdcb8ae33a64816a82c71fa6a3d9541bfad6f5 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 14 Oct 2013 22:32:22 -0700 Subject: [PATCH 03/12] refactor change testAndSet to CompareAndSwap --- store/event.go | 12 +++--- store/stats.go | 41 ++++++++++--------- store/stats_test.go | 14 +++---- store/store.go | 10 ++--- store/store_test.go | 10 ++--- .../coreos/go-etcd/etcd/client_test.go | 8 ++-- .../examples/sync-cluster/sync-cluster.go | 1 - .../github.com/coreos/go-log/log/commands.go | 28 ++++++------- .../github.com/coreos/go-log/log/fields.go | 1 + .../github.com/coreos/go-log/log/logger.go | 1 + .../github.com/coreos/go-log/log/priority.go | 1 + .../github.com/coreos/go-log/log/sinks.go | 1 + .../coreos/go-systemd/activation/files.go | 2 +- 13 files changed, 66 insertions(+), 64 deletions(-) diff --git a/store/event.go b/store/event.go index 0d9ec0a37..f9ae0938b 100644 --- a/store/event.go +++ b/store/event.go @@ -5,12 +5,12 @@ import ( ) const ( - Get = "get" - Create = "create" - Update = "update" - Delete = "delete" - TestAndSet = "testAndSet" - Expire = "expire" + Get = "get" + Create = "create" + Update = "update" + Delete = "delete" + CompareAndSwap = "compareAndSwap" + Expire = "expire" ) const ( diff --git a/store/stats.go b/store/stats.go index e2053ed42..4c89b93e3 100644 --- a/store/stats.go +++ b/store/stats.go @@ -6,17 +6,17 @@ import ( ) const ( - SetSuccess = 100 - SetFail = 101 - DeleteSuccess = 102 - DeleteFail = 103 - UpdateSuccess = 104 - UpdateFail = 105 - TestAndSetSuccess = 106 - TestAndSetFail = 107 - GetSuccess = 110 - GetFail = 111 - ExpireCount = 112 + SetSuccess = 100 + SetFail = 101 + DeleteSuccess = 102 + DeleteFail = 103 + UpdateSuccess = 104 + UpdateFail = 105 + CompareAndSwapSuccess = 106 + CompareAndSwapFail = 107 + GetSuccess = 110 + GetFail = 111 + ExpireCount = 112 ) type Stats struct { @@ -38,9 +38,10 @@ type Stats struct { UpdateFail uint64 `json:"updateFail"` // Number of testAndSet requests - TestAndSetSuccess uint64 `json:"testAndSetSuccess"` - TestAndSetFail uint64 `json:"testAndSetFail"` - ExpireCount uint64 `json:"expireCount"` + CompareAndSwapSuccess uint64 `json:"compareAndSwapSuccess"` + CompareAndSwapFail uint64 `json:"compareAndSwapFail"` + + ExpireCount uint64 `json:"expireCount"` Watchers uint64 `json:"watchers"` } @@ -53,7 +54,7 @@ func newStats() *Stats { func (s *Stats) clone() *Stats { return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail, s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail, - s.TestAndSetSuccess, s.TestAndSetFail, s.Watchers, s.ExpireCount} + s.CompareAndSwapSuccess, s.CompareAndSwapFail, s.Watchers, s.ExpireCount} } // Status() return the statistics info of etcd storage its recent start @@ -69,7 +70,7 @@ func (s *Stats) TotalReads() uint64 { func (s *Stats) TotalWrites() uint64 { return s.SetSuccess + s.SetFail + s.DeleteSuccess + s.DeleteFail + - s.TestAndSetSuccess + s.TestAndSetFail + + s.CompareAndSwapSuccess + s.CompareAndSwapFail + s.UpdateSuccess + s.UpdateFail } @@ -91,10 +92,10 @@ func (s *Stats) Inc(field int) { atomic.AddUint64(&s.UpdateSuccess, 1) case UpdateFail: atomic.AddUint64(&s.UpdateFail, 1) - case TestAndSetSuccess: - atomic.AddUint64(&s.TestAndSetSuccess, 1) - case TestAndSetFail: - atomic.AddUint64(&s.TestAndSetFail, 1) + case CompareAndSwapSuccess: + atomic.AddUint64(&s.CompareAndSwapSuccess, 1) + case CompareAndSwapFail: + atomic.AddUint64(&s.CompareAndSwapFail, 1) case ExpireCount: atomic.AddUint64(&s.ExpireCount, 1) } diff --git a/store/stats_test.go b/store/stats_test.go index fa6483ac2..7c2296fb0 100644 --- a/store/stats_test.go +++ b/store/stats_test.go @@ -12,7 +12,7 @@ func TestBasicStats(t *testing.T) { var i uint64 var GetSuccess, GetFail, SetSuccess, SetFail, DeleteSuccess, DeleteFail uint64 - var UpdateSuccess, UpdateFail, TestAndSetSuccess, TestAndSetFail, watcher_number uint64 + var UpdateSuccess, UpdateFail, CompareAndSwapSuccess, CompareAndSwapFail, watcher_number uint64 for _, k := range keys { i++ @@ -60,9 +60,9 @@ func TestBasicStats(t *testing.T) { i++ _, err := s.CompareAndSwap(k, "foo", 0, "bar", Permanent, i, 1) if err != nil { - TestAndSetFail++ + CompareAndSwapFail++ } else { - TestAndSetSuccess++ + CompareAndSwapSuccess++ } } @@ -132,12 +132,12 @@ func TestBasicStats(t *testing.T) { t.Fatalf("UpdateFail [%d] != Stats.UpdateFail [%d]", UpdateFail, s.Stats.UpdateFail) } - if TestAndSetSuccess != s.Stats.TestAndSetSuccess { - t.Fatalf("TestAndSetSuccess [%d] != Stats.TestAndSetSuccess [%d]", TestAndSetSuccess, s.Stats.TestAndSetSuccess) + if CompareAndSwapSuccess != s.Stats.CompareAndSwapSuccess { + t.Fatalf("TestAndSetSuccess [%d] != Stats.CompareAndSwapSuccess [%d]", CompareAndSwapSuccess, s.Stats.CompareAndSwapSuccess) } - if TestAndSetFail != s.Stats.TestAndSetFail { - t.Fatalf("TestAndSetFail [%d] != Stats.TestAndSetFail [%d]", TestAndSetFail, s.Stats.TestAndSetFail) + if CompareAndSwapFail != s.Stats.CompareAndSwapFail { + t.Fatalf("TestAndSetFail [%d] != Stats.TestAndSetFail [%d]", CompareAndSwapFail, s.Stats.CompareAndSwapFail) } s = newStore() diff --git a/store/store.go b/store/store.go index f25b4345e..6e0fe4d68 100644 --- a/store/store.go +++ b/store/store.go @@ -130,19 +130,19 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint n, err := s.internalGet(nodePath, index, term) if err != nil { - s.Stats.Inc(TestAndSetFail) + s.Stats.Inc(CompareAndSwapFail) return nil, err } if n.IsDir() { // can only test and set file - s.Stats.Inc(TestAndSetFail) + s.Stats.Inc(CompareAndSwapFail) return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term) } // If both of the prevValue and prevIndex are given, we will test both of them. // Command will be executed, only if both of the tests are successful. if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) { - e := newEvent(TestAndSet, nodePath, index, term) + e := newEvent(CompareAndSwap, nodePath, index, term) e.PrevValue = n.Value // if test succeed, write the value @@ -153,12 +153,12 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint e.Expiration, e.TTL = n.ExpirationAndTTL() s.WatcherHub.notify(e) - s.Stats.Inc(TestAndSetSuccess) + s.Stats.Inc(CompareAndSwapSuccess) return e, nil } cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex) - s.Stats.Inc(TestAndSetFail) + s.Stats.Inc(CompareAndSwapFail) return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, index, term) } diff --git a/store/store_test.go b/store/store_test.go index 457a9b5f5..f26d3d616 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -340,8 +340,8 @@ func TestWatch(t *testing.T) { c, _ = s.Watch("/foo/foo/foo", false, 0, 2, 1) s.CompareAndSwap("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/foo" || e.Action != TestAndSet { - t.Fatal("watch for TestAndSet node fails") + if e.Key != "/foo/foo/foo" || e.Action != CompareAndSwap { + t.Fatal("watch for CompareAndSwap node fails") } c, _ = s.Watch("/foo/foo/foo", false, 0, 3, 1) @@ -369,8 +369,8 @@ func TestWatch(t *testing.T) { c, _ = s.Watch("/foo", true, 0, 6, 1) s.CompareAndSwap("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" || e.Action != TestAndSet { - t.Fatal("watch for TestAndSet subdirectory fails") + if e.Key != "/foo/foo/boo" || e.Action != CompareAndSwap { + t.Fatal("watch for CompareAndSwap subdirectory fails") } c, _ = s.Watch("/foo", true, 0, 7, 1) @@ -404,7 +404,7 @@ func TestWatch(t *testing.T) { time.Sleep(time.Second * 2) e = nonblockingRetrive(c) if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 13 { - t.Fatal("watch for Expiration of TestAndSet() subdirectory fails ", e) + t.Fatal("watch for Expiration of CompareAndSwap() subdirectory fails ", e) } } diff --git a/third_party/github.com/coreos/go-etcd/etcd/client_test.go b/third_party/github.com/coreos/go-etcd/etcd/client_test.go index 29f138113..bf75d8947 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client_test.go @@ -2,9 +2,9 @@ package etcd import ( "fmt" - "testing" - "net/url" "net" + "net/url" + "testing" ) // To pass this test, we need to create a cluster of 3 machines @@ -19,7 +19,7 @@ func TestSync(t *testing.T) { t.Fatal("cannot sync machines") } - for _, m := range(c.GetCluster()) { + for _, m := range c.GetCluster() { u, err := url.Parse(m) if err != nil { t.Fatal(err) @@ -27,7 +27,7 @@ func TestSync(t *testing.T) { if u.Scheme != "http" { t.Fatal("scheme must be http") } - + host, _, err := net.SplitHostPort(u.Host) if err != nil { t.Fatal(err) diff --git a/third_party/github.com/coreos/go-etcd/examples/sync-cluster/sync-cluster.go b/third_party/github.com/coreos/go-etcd/examples/sync-cluster/sync-cluster.go index 8249b4bdc..8c7e375c5 100644 --- a/third_party/github.com/coreos/go-etcd/examples/sync-cluster/sync-cluster.go +++ b/third_party/github.com/coreos/go-etcd/examples/sync-cluster/sync-cluster.go @@ -1,4 +1,3 @@ - package main import ( diff --git a/third_party/github.com/coreos/go-log/log/commands.go b/third_party/github.com/coreos/go-log/log/commands.go index 94dc9e152..f39fdef97 100644 --- a/third_party/github.com/coreos/go-log/log/commands.go +++ b/third_party/github.com/coreos/go-log/log/commands.go @@ -1,4 +1,5 @@ package log + // Copyright 2013, CoreOS, Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -42,7 +43,6 @@ func (logger *Logger) Logf(priority Priority, format string, v ...interface{}) { logger.Log(priority, fmt.Sprintf(format, v...)) } - func (logger *Logger) Emergency(v ...interface{}) { logger.Log(PriEmerg, v...) } @@ -99,7 +99,6 @@ func (logger *Logger) Debugf(format string, v ...interface{}) { logger.Log(PriDebug, fmt.Sprintf(format, v...)) } - func Emergency(v ...interface{}) { defaultLogger.Log(PriEmerg, v...) } @@ -158,57 +157,56 @@ func Debugf(format string, v ...interface{}) { // Standard library log functions -func (logger *Logger)Fatalln (v ...interface{}) { +func (logger *Logger) Fatalln(v ...interface{}) { logger.Log(PriCrit, v...) os.Exit(1) } -func (logger *Logger)Fatalf (format string, v ...interface{}) { +func (logger *Logger) Fatalf(format string, v ...interface{}) { logger.Logf(PriCrit, format, v...) os.Exit(1) } -func (logger *Logger)Panicln (v ...interface{}) { +func (logger *Logger) Panicln(v ...interface{}) { s := fmt.Sprint(v...) logger.Log(PriErr, s) panic(s) } -func (logger *Logger)Panicf (format string, v ...interface{}) { +func (logger *Logger) Panicf(format string, v ...interface{}) { s := fmt.Sprintf(format, v...) logger.Log(PriErr, s) panic(s) } -func (logger *Logger)Println (v ...interface{}) { +func (logger *Logger) Println(v ...interface{}) { logger.Log(PriInfo, v...) } -func (logger *Logger)Printf (format string, v ...interface{}) { +func (logger *Logger) Printf(format string, v ...interface{}) { logger.Logf(PriInfo, format, v...) } - -func Fatalln (v ...interface{}) { +func Fatalln(v ...interface{}) { defaultLogger.Log(PriCrit, v...) os.Exit(1) } -func Fatalf (format string, v ...interface{}) { +func Fatalf(format string, v ...interface{}) { defaultLogger.Logf(PriCrit, format, v...) os.Exit(1) } -func Panicln (v ...interface{}) { +func Panicln(v ...interface{}) { s := fmt.Sprint(v...) defaultLogger.Log(PriErr, s) panic(s) } -func Panicf (format string, v ...interface{}) { +func Panicf(format string, v ...interface{}) { s := fmt.Sprintf(format, v...) defaultLogger.Log(PriErr, s) panic(s) } -func Println (v ...interface{}) { +func Println(v ...interface{}) { defaultLogger.Log(PriInfo, v...) } -func Printf (format string, v ...interface{}) { +func Printf(format string, v ...interface{}) { defaultLogger.Logf(PriInfo, format, v...) } diff --git a/third_party/github.com/coreos/go-log/log/fields.go b/third_party/github.com/coreos/go-log/log/fields.go index e8d9698a0..b04edc8eb 100644 --- a/third_party/github.com/coreos/go-log/log/fields.go +++ b/third_party/github.com/coreos/go-log/log/fields.go @@ -1,4 +1,5 @@ package log + // Copyright 2013, CoreOS, Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/third_party/github.com/coreos/go-log/log/logger.go b/third_party/github.com/coreos/go-log/log/logger.go index 2089a11f8..8c3b86c1d 100644 --- a/third_party/github.com/coreos/go-log/log/logger.go +++ b/third_party/github.com/coreos/go-log/log/logger.go @@ -1,4 +1,5 @@ package log + // Copyright 2013, CoreOS, Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/third_party/github.com/coreos/go-log/log/priority.go b/third_party/github.com/coreos/go-log/log/priority.go index ac73fc8a4..c169d6869 100644 --- a/third_party/github.com/coreos/go-log/log/priority.go +++ b/third_party/github.com/coreos/go-log/log/priority.go @@ -1,4 +1,5 @@ package log + // Copyright 2013, CoreOS, Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/third_party/github.com/coreos/go-log/log/sinks.go b/third_party/github.com/coreos/go-log/log/sinks.go index a41f3365d..bdf1e41f1 100644 --- a/third_party/github.com/coreos/go-log/log/sinks.go +++ b/third_party/github.com/coreos/go-log/log/sinks.go @@ -1,4 +1,5 @@ package log + // Copyright 2013, CoreOS, Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/third_party/github.com/coreos/go-systemd/activation/files.go b/third_party/github.com/coreos/go-systemd/activation/files.go index 4b8542370..a0a56f9e6 100644 --- a/third_party/github.com/coreos/go-systemd/activation/files.go +++ b/third_party/github.com/coreos/go-systemd/activation/files.go @@ -24,7 +24,7 @@ func Files() []*os.File { files := []*os.File(nil) for fd := listenFdsStart; fd < listenFdsStart+nfds; fd++ { syscall.CloseOnExec(fd) - files = append(files, os.NewFile(uintptr(fd), "LISTEN_FD_" + strconv.Itoa(fd))) + files = append(files, os.NewFile(uintptr(fd), "LISTEN_FD_"+strconv.Itoa(fd))) } return files } From 2aeb25e80ca73fbb6307da6c447933ff63603c16 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 14 Oct 2013 22:38:07 -0700 Subject: [PATCH 04/12] refactor change handler name to its method --- server/server.go | 8 ++++---- server/v2/{delete_key_handler.go => delete_handler.go} | 2 +- server/v2/{get_key_handler.go => get_handler.go} | 2 +- server/v2/{create_key_handler.go => post_handler.go} | 2 +- server/v2/{update_key_handler.go => put_handler.go} | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) rename server/v2/{delete_key_handler.go => delete_handler.go} (77%) rename server/v2/{get_key_handler.go => get_handler.go} (95%) rename server/v2/{create_key_handler.go => post_handler.go} (87%) rename server/v2/{update_key_handler.go => put_handler.go} (95%) diff --git a/server/server.go b/server/server.go index 0aeb5dbc4..786947c81 100644 --- a/server/server.go +++ b/server/server.go @@ -102,10 +102,10 @@ func (s *Server) installV1() { } func (s *Server) installV2() { - s.handleFuncV2("/v2/keys/{key:.*}", v2.GetKeyHandler).Methods("GET") - s.handleFuncV2("/v2/keys/{key:.*}", v2.CreateKeyHandler).Methods("POST") - s.handleFuncV2("/v2/keys/{key:.*}", v2.UpdateKeyHandler).Methods("PUT") - s.handleFuncV2("/v2/keys/{key:.*}", v2.DeleteKeyHandler).Methods("DELETE") + s.handleFuncV2("/v2/keys/{key:.*}", v2.GetHandler).Methods("GET") + s.handleFuncV2("/v2/keys/{key:.*}", v2.PostHandler).Methods("POST") + s.handleFuncV2("/v2/keys/{key:.*}", v2.PutHandler).Methods("PUT") + s.handleFuncV2("/v2/keys/{key:.*}", v2.DeleteHandler).Methods("DELETE") s.handleFunc("/v2/leader", s.GetLeaderHandler).Methods("GET") s.handleFunc("/v2/machines", s.GetMachinesHandler).Methods("GET") s.handleFunc("/v2/stats/self", s.GetStatsHandler).Methods("GET") diff --git a/server/v2/delete_key_handler.go b/server/v2/delete_handler.go similarity index 77% rename from server/v2/delete_key_handler.go rename to server/v2/delete_handler.go index c53e72459..9012498cf 100644 --- a/server/v2/delete_key_handler.go +++ b/server/v2/delete_handler.go @@ -7,7 +7,7 @@ import ( "github.com/gorilla/mux" ) -func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { +func DeleteHandler(w http.ResponseWriter, req *http.Request, s Server) error { vars := mux.Vars(req) key := "/" + vars["key"] diff --git a/server/v2/get_key_handler.go b/server/v2/get_handler.go similarity index 95% rename from server/v2/get_key_handler.go rename to server/v2/get_handler.go index a7f8e56a1..d0e804273 100644 --- a/server/v2/get_key_handler.go +++ b/server/v2/get_handler.go @@ -13,7 +13,7 @@ import ( "github.com/gorilla/mux" ) -func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { +func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error { var err error var event *store.Event diff --git a/server/v2/create_key_handler.go b/server/v2/post_handler.go similarity index 87% rename from server/v2/create_key_handler.go rename to server/v2/post_handler.go index fab9bde1f..dda146e5e 100644 --- a/server/v2/create_key_handler.go +++ b/server/v2/post_handler.go @@ -8,7 +8,7 @@ import ( "github.com/gorilla/mux" ) -func CreateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { +func PostHandler(w http.ResponseWriter, req *http.Request, s Server) error { vars := mux.Vars(req) key := "/" + vars["key"] diff --git a/server/v2/update_key_handler.go b/server/v2/put_handler.go similarity index 95% rename from server/v2/update_key_handler.go rename to server/v2/put_handler.go index 60c260afc..6a5856f92 100644 --- a/server/v2/update_key_handler.go +++ b/server/v2/put_handler.go @@ -10,7 +10,7 @@ import ( "github.com/gorilla/mux" ) -func UpdateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { +func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error { vars := mux.Vars(req) key := "/" + vars["key"] From 53a9bd06185ee875f75ddcd4d11400699c600314 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 14 Oct 2013 22:44:17 -0700 Subject: [PATCH 05/12] feat add set command --- server/server.go | 3 +-- server/v1/set_key_handler.go | 3 +-- server/v2/put_handler.go | 3 +-- store/create_command.go | 3 +-- store/set_command.go | 38 ++++++++++++++++++++++++++++++++++++ 5 files changed, 42 insertions(+), 8 deletions(-) create mode 100644 store/set_command.go diff --git a/server/server.go b/server/server.go index 786947c81..36df79f6f 100644 --- a/server/server.go +++ b/server/server.go @@ -254,11 +254,10 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro for i := 0; i < count; i++ { go func() { for j := 0; j < 10; j++ { - c := &store.CreateCommand{ + c := &store.SetCommand{ Key: "foo", Value: "bar", ExpireTime: time.Unix(0, 0), - Force: true, } s.peerServer.RaftServer().Do(c) } diff --git a/server/v1/set_key_handler.go b/server/v1/set_key_handler.go index acd4037c1..887c8c22e 100644 --- a/server/v1/set_key_handler.go +++ b/server/v1/set_key_handler.go @@ -39,11 +39,10 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { } } else { - c = &store.CreateCommand{ + c = &store.SetCommand{ Key: key, Value: value, ExpireTime: expireTime, - Force: true, } } diff --git a/server/v2/put_handler.go b/server/v2/put_handler.go index 6a5856f92..a0580f840 100644 --- a/server/v2/put_handler.go +++ b/server/v2/put_handler.go @@ -35,11 +35,10 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error { // Set command: create a new node or replace the old one. if !valueOk && !indexOk && !existOk { - c = &store.CreateCommand{ + c = &store.SetCommand{ Key: key, Value: value, ExpireTime: expireTime, - Force: true, } return s.Dispatch(c, w, req) } diff --git a/store/create_command.go b/store/create_command.go index 43c09f998..c1f57910a 100644 --- a/store/create_command.go +++ b/store/create_command.go @@ -16,7 +16,6 @@ type CreateCommand struct { Value string `json:"value"` ExpireTime time.Time `json:"expireTime"` IncrementalSuffix bool `json:"incrementalSuffix"` - Force bool `json:"force"` } // The name of the create command in the log @@ -28,7 +27,7 @@ func (c *CreateCommand) CommandName() string { func (c *CreateCommand) Apply(server raft.Server) (interface{}, error) { s, _ := server.StateMachine().(Store) - e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term()) + e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, false, c.ExpireTime, server.CommitIndex(), server.Term()) if err != nil { log.Debug(err) diff --git a/store/set_command.go b/store/set_command.go new file mode 100644 index 000000000..ac8e2cf58 --- /dev/null +++ b/store/set_command.go @@ -0,0 +1,38 @@ +package store + +import ( + "github.com/coreos/etcd/log" + "github.com/coreos/go-raft" + "time" +) + +func init() { + raft.RegisterCommand(&CreateCommand{}) +} + +// Create command +type SetCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` +} + +// The name of the create command in the log +func (c *SetCommand) CommandName() string { + return "etcd:set" +} + +// Create node +func (c *SetCommand) Apply(server raft.Server) (interface{}, error) { + s, _ := server.StateMachine().(Store) + + // create a new node or replace the old node. + e, err := s.Create(c.Key, c.Value, false, true, c.ExpireTime, server.CommitIndex(), server.Term()) + + if err != nil { + log.Debug(err) + return nil, err + } + + return e, nil +} From 278a0899083fff7ea20d63fd8f6d11b45a7dd201 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 14 Oct 2013 22:45:29 -0700 Subject: [PATCH 06/12] fix set should register set rather than create --- store/set_command.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/set_command.go b/store/set_command.go index ac8e2cf58..6c2bd6885 100644 --- a/store/set_command.go +++ b/store/set_command.go @@ -7,7 +7,7 @@ import ( ) func init() { - raft.RegisterCommand(&CreateCommand{}) + raft.RegisterCommand(&SetCommand{}) } // Create command From fbf40fb74a1c700fe786bc251eef1cc3d89738d5 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 14 Oct 2013 23:04:21 -0700 Subject: [PATCH 07/12] refactor store.go add set function --- server/registry.go | 2 +- store/create_command.go | 2 +- store/event.go | 3 ++- store/set_command.go | 2 +- store/store.go | 18 ++++++++++++++---- 5 files changed, 19 insertions(+), 8 deletions(-) diff --git a/server/registry.go b/server/registry.go index 23ef9ddbb..fa63b5027 100644 --- a/server/registry.go +++ b/server/registry.go @@ -45,7 +45,7 @@ func (r *Registry) Register(name string, peerVersion string, peerURL string, url // Write data to store. key := path.Join(RegistryKey, name) value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", peerURL, url, peerVersion) - _, err := r.store.Create(key, value, false, false, store.Permanent, commitIndex, term) + _, err := r.store.Create(key, value, false, store.Permanent, commitIndex, term) log.Debugf("Register: %s (%v)", name, err) return err } diff --git a/store/create_command.go b/store/create_command.go index c1f57910a..b9f1aced5 100644 --- a/store/create_command.go +++ b/store/create_command.go @@ -27,7 +27,7 @@ func (c *CreateCommand) CommandName() string { func (c *CreateCommand) Apply(server raft.Server) (interface{}, error) { s, _ := server.StateMachine().(Store) - e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, false, c.ExpireTime, server.CommitIndex(), server.Term()) + e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.ExpireTime, server.CommitIndex(), server.Term()) if err != nil { log.Debug(err) diff --git a/store/event.go b/store/event.go index f9ae0938b..14ba1e529 100644 --- a/store/event.go +++ b/store/event.go @@ -7,6 +7,7 @@ import ( const ( Get = "get" Create = "create" + Set = "set" Update = "update" Delete = "delete" CompareAndSwap = "compareAndSwap" @@ -54,7 +55,7 @@ func (event *Event) Response() interface{} { Expiration: event.Expiration, } - if response.Action == Create || response.Action == Update { + if response.Action == Create || response.Action == Set { response.Action = "set" if response.PrevValue == "" { response.NewKey = true diff --git a/store/set_command.go b/store/set_command.go index 6c2bd6885..55635cd99 100644 --- a/store/set_command.go +++ b/store/set_command.go @@ -27,7 +27,7 @@ func (c *SetCommand) Apply(server raft.Server) (interface{}, error) { s, _ := server.StateMachine().(Store) // create a new node or replace the old node. - e, err := s.Create(c.Key, c.Value, false, true, c.ExpireTime, server.CommitIndex(), server.Term()) + e, err := s.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) if err != nil { log.Debug(err) diff --git a/store/store.go b/store/store.go index 6e0fe4d68..35dfd32c5 100644 --- a/store/store.go +++ b/store/store.go @@ -15,8 +15,9 @@ import ( type Store interface { Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) - Create(nodePath string, value string, incrementalSuffix bool, force bool, - expireTime time.Time, index uint64, term uint64) (*Event, error) + Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) + Create(nodePath string, value string, incrementalSuffix bool, expireTime time.Time, + index uint64, term uint64) (*Event, error) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) @@ -106,13 +107,22 @@ func (s *store) Get(nodePath string, recursive, sorted bool, index uint64, term // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl. // If the node has already existed, create will fail. // If any node on the path is a file, create will fail. -func (s *store) Create(nodePath string, value string, incrementalSuffix bool, force bool, +func (s *store) Create(nodePath string, value string, incrementalSuffix bool, expireTime time.Time, index uint64, term uint64) (*Event, error) { nodePath = path.Clean(path.Join("/", nodePath)) s.worldLock.Lock() defer s.worldLock.Unlock() - return s.internalCreate(nodePath, value, incrementalSuffix, force, expireTime, index, term, Create) + return s.internalCreate(nodePath, value, incrementalSuffix, false, expireTime, index, term, Create) +} + +// Set function creates or replace the Node at nodePath. +func (s *store) Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { + nodePath = path.Clean(path.Join("/", nodePath)) + + s.worldLock.Lock() + defer s.worldLock.Unlock() + return s.internalCreate(nodePath, value, false, true, expireTime, index, term, Set) } func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, From c5f9afa0e87cceeb8235def4a92166db6a06ad91 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 14 Oct 2013 23:15:31 -0700 Subject: [PATCH 08/12] fix store test --- store/stats_test.go | 4 +-- store/store.go | 4 +-- store/store_test.go | 64 ++++++++++++++++++++++----------------------- 3 files changed, 36 insertions(+), 36 deletions(-) diff --git a/store/stats_test.go b/store/stats_test.go index 7c2296fb0..6f3b34242 100644 --- a/store/stats_test.go +++ b/store/stats_test.go @@ -16,7 +16,7 @@ func TestBasicStats(t *testing.T) { for _, k := range keys { i++ - _, err := s.Create(k, "bar", false, false, time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) + _, err := s.Create(k, "bar", false, time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) if err != nil { SetFail++ } else { @@ -146,7 +146,7 @@ func TestBasicStats(t *testing.T) { for _, k := range keys { i++ - _, err := s.Create(k, "bar", false, false, time.Now().Add(time.Second*3), i, 1) + _, err := s.Create(k, "bar", false, time.Now().Add(time.Second*3), i, 1) if err != nil { SetFail++ } else { diff --git a/store/store.go b/store/store.go index 35dfd32c5..8cecfb178 100644 --- a/store/store.go +++ b/store/store.go @@ -302,7 +302,7 @@ func (s *store) update(nodePath string, newValue string, expireTime time.Time, i return e, nil } -func (s *store) internalCreate(nodePath string, value string, incrementalSuffix bool, force bool, +func (s *store) internalCreate(nodePath string, value string, incrementalSuffix bool, replace bool, expireTime time.Time, index uint64, term uint64, action string) (*Event, error) { s.Index, s.Term = index, term @@ -330,7 +330,7 @@ func (s *store) internalCreate(nodePath string, value string, incrementalSuffix // force will try to replace a existing file if n != nil { - if force { + if replace { if n.IsDir() { return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term) } diff --git a/store/store_test.go b/store/store_test.go index f26d3d616..dd6a2f818 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -10,10 +10,10 @@ import ( func TestCreateAndGet(t *testing.T) { s := newStore() - s.Create("/foobar", "bar", false, false, Permanent, 1, 1) + s.Create("/foobar", "bar", false, Permanent, 1, 1) // already exist, create should fail - _, err := s.Create("/foobar", "bar", false, false, Permanent, 1, 1) + _, err := s.Create("/foobar", "bar", false, Permanent, 1, 1) if err == nil { t.Fatal("Create should fail") @@ -21,10 +21,10 @@ func TestCreateAndGet(t *testing.T) { s.Delete("/foobar", true, 1, 1) - s.Create("/foobar/foo", "bar", false, false, Permanent, 1, 1) + s.Create("/foobar/foo", "bar", false, Permanent, 1, 1) // already exist, create should fail - _, err = s.Create("/foobar", "bar", false, false, Permanent, 1, 1) + _, err = s.Create("/foobar", "bar", false, Permanent, 1, 1) if err == nil { t.Fatal("Create should fail") @@ -38,14 +38,14 @@ func TestCreateAndGet(t *testing.T) { createAndGet(s, "/foo/foo/bar", t) // meet file, create should fail - _, err = s.Create("/foo/bar/bar", "bar", false, false, Permanent, 2, 1) + _, err = s.Create("/foo/bar/bar", "bar", false, Permanent, 2, 1) if err == nil { t.Fatal("Create should fail") } // create a directory - _, err = s.Create("/fooDir", "", false, false, Permanent, 3, 1) + _, err = s.Create("/fooDir", "", false, Permanent, 3, 1) if err != nil { t.Fatal("Cannot create /fooDir") @@ -58,7 +58,7 @@ func TestCreateAndGet(t *testing.T) { } // create a file under directory - _, err = s.Create("/fooDir/bar", "bar", false, false, Permanent, 4, 1) + _, err = s.Create("/fooDir/bar", "bar", false, Permanent, 4, 1) if err != nil { t.Fatal("Cannot create /fooDir/bar = bar") @@ -68,7 +68,7 @@ func TestCreateAndGet(t *testing.T) { func TestUpdateFile(t *testing.T) { s := newStore() - _, err := s.Create("/foo/bar", "bar", false, false, Permanent, 1, 1) + _, err := s.Create("/foo/bar", "bar", false, Permanent, 1, 1) if err != nil { t.Fatalf("cannot create %s=bar [%s]", "/foo/bar", err.Error()) @@ -91,24 +91,24 @@ func TestUpdateFile(t *testing.T) { } // create a directory, update its ttl, to see if it will be deleted - _, err = s.Create("/foo/foo", "", false, false, Permanent, 3, 1) + _, err = s.Create("/foo/foo", "", false, Permanent, 3, 1) if err != nil { t.Fatalf("cannot create dir [%s] [%s]", "/foo/foo", err.Error()) } - _, err = s.Create("/foo/foo/foo1", "bar1", false, false, Permanent, 4, 1) + _, err = s.Create("/foo/foo/foo1", "bar1", false, Permanent, 4, 1) if err != nil { t.Fatal("cannot create [%s]", err.Error()) } - _, err = s.Create("/foo/foo/foo2", "", false, false, Permanent, 5, 1) + _, err = s.Create("/foo/foo/foo2", "", false, Permanent, 5, 1) if err != nil { t.Fatal("cannot create [%s]", err.Error()) } - _, err = s.Create("/foo/foo/foo2/boo", "boo1", false, false, Permanent, 6, 1) + _, err = s.Create("/foo/foo/foo2/boo", "boo1", false, Permanent, 6, 1) if err != nil { t.Fatal("cannot create [%s]", err.Error()) } @@ -165,11 +165,11 @@ func TestListDirectory(t *testing.T) { // create dir /foo // set key-value /foo/foo=bar - s.Create("/foo/foo", "bar", false, false, Permanent, 1, 1) + s.Create("/foo/foo", "bar", false, Permanent, 1, 1) // create dir /foo/fooDir // set key-value /foo/fooDir/foo=bar - s.Create("/foo/fooDir/foo", "bar", false, false, Permanent, 2, 1) + s.Create("/foo/fooDir/foo", "bar", false, Permanent, 2, 1) e, err := s.Get("/foo", true, false, 2, 1) @@ -196,7 +196,7 @@ func TestListDirectory(t *testing.T) { // create dir /foo/_hidden // set key-value /foo/_hidden/foo -> bar - s.Create("/foo/_hidden/foo", "bar", false, false, Permanent, 3, 1) + s.Create("/foo/_hidden/foo", "bar", false, Permanent, 3, 1) e, _ = s.Get("/foo", false, false, 2, 1) @@ -208,7 +208,7 @@ func TestListDirectory(t *testing.T) { func TestRemove(t *testing.T) { s := newStore() - s.Create("/foo", "bar", false, false, Permanent, 1, 1) + s.Create("/foo", "bar", false, Permanent, 1, 1) _, err := s.Delete("/foo", false, 1, 1) if err != nil { @@ -221,9 +221,9 @@ func TestRemove(t *testing.T) { t.Fatalf("can get the node after deletion") } - s.Create("/foo/bar", "bar", false, false, Permanent, 1, 1) - s.Create("/foo/car", "car", false, false, Permanent, 1, 1) - s.Create("/foo/dar/dar", "dar", false, false, Permanent, 1, 1) + s.Create("/foo/bar", "bar", false, Permanent, 1, 1) + s.Create("/foo/car", "car", false, Permanent, 1, 1) + s.Create("/foo/dar/dar", "dar", false, Permanent, 1, 1) _, err = s.Delete("/foo", false, 1, 1) @@ -249,7 +249,7 @@ func TestExpire(t *testing.T) { expire := time.Now().Add(time.Second) - s.Create("/foo", "bar", false, false, expire, 1, 1) + s.Create("/foo", "bar", false, expire, 1, 1) _, err := s.Get("/foo", false, false, 1, 1) @@ -267,7 +267,7 @@ func TestExpire(t *testing.T) { // test if we can reach the node before expiration expire = time.Now().Add(time.Second) - s.Create("/foo", "bar", false, false, expire, 1, 1) + s.Create("/foo", "bar", false, expire, 1, 1) time.Sleep(time.Millisecond * 50) _, err = s.Get("/foo", false, false, 1, 1) @@ -278,7 +278,7 @@ func TestExpire(t *testing.T) { expire = time.Now().Add(time.Second) - s.Create("/foo", "bar", false, false, expire, 1, 1) + s.Create("/foo", "bar", false, expire, 1, 1) _, err = s.Delete("/foo", false, 1, 1) if err != nil { @@ -288,7 +288,7 @@ func TestExpire(t *testing.T) { func TestCompareAndSwap(t *testing.T) { // TODO prevValue == nil ? s := newStore() - s.Create("/foo", "bar", false, false, Permanent, 1, 1) + s.Create("/foo", "bar", false, Permanent, 1, 1) // test on wrong previous value _, err := s.CompareAndSwap("/foo", "barbar", 0, "car", Permanent, 2, 1) @@ -323,7 +323,7 @@ func TestWatch(t *testing.T) { s := newStore() // watch at a deeper path c, _ := s.Watch("/foo/foo/foo", false, 0, 0, 1) - s.Create("/foo/foo/foo", "bar", false, false, Permanent, 1, 1) + s.Create("/foo/foo/foo", "bar", false, Permanent, 1, 1) e := nonblockingRetrive(c) if e.Key != "/foo/foo/foo" || e.Action != Create { @@ -353,7 +353,7 @@ func TestWatch(t *testing.T) { // watch at a prefix c, _ = s.Watch("/foo", true, 0, 4, 1) - s.Create("/foo/foo/boo", "bar", false, false, Permanent, 5, 1) + s.Create("/foo/foo/boo", "bar", false, Permanent, 5, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/boo" || e.Action != Create { t.Fatal("watch for Create subdirectory fails") @@ -381,7 +381,7 @@ func TestWatch(t *testing.T) { } // watch expire - s.Create("/foo/foo/boo", "foo", false, false, time.Now().Add(time.Second*1), 9, 1) + s.Create("/foo/foo/boo", "foo", false, time.Now().Add(time.Second*1), 9, 1) c, _ = s.Watch("/foo", true, 0, 9, 1) time.Sleep(time.Second * 2) e = nonblockingRetrive(c) @@ -389,7 +389,7 @@ func TestWatch(t *testing.T) { t.Fatal("watch for Expiration of Create() subdirectory fails ", e) } - s.Create("/foo/foo/boo", "foo", false, false, Permanent, 10, 1) + s.Create("/foo/foo/boo", "foo", false, Permanent, 10, 1) s.update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1) c, _ = s.Watch("/foo", true, 0, 11, 1) time.Sleep(time.Second * 2) @@ -398,7 +398,7 @@ func TestWatch(t *testing.T) { t.Fatal("watch for Expiration of Update() subdirectory fails ", e) } - s.Create("/foo/foo/boo", "foo", false, false, Permanent, 12, 1) + s.Create("/foo/foo/boo", "foo", false, Permanent, 12, 1) s.CompareAndSwap("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1) c, _ = s.Watch("/foo", true, 0, 13, 1) time.Sleep(time.Second * 2) @@ -416,7 +416,7 @@ func TestSort(t *testing.T) { i := uint64(1) for _, k := range keys { - _, err := s.Create(k, "bar", false, false, Permanent, i, 1) + _, err := s.Create(k, "bar", false, Permanent, i, 1) if err != nil { panic(err) } else { @@ -454,7 +454,7 @@ func TestSaveAndRecover(t *testing.T) { i := uint64(1) for _, k := range keys { - _, err := s.Create(k, "bar", false, false, Permanent, i, 1) + _, err := s.Create(k, "bar", false, Permanent, i, 1) if err != nil { panic(err) } else { @@ -466,7 +466,7 @@ func TestSaveAndRecover(t *testing.T) { // test if we can reach the node before expiration expire := time.Now().Add(time.Second) - s.Create("/foo/foo", "bar", false, false, expire, 1, 1) + s.Create("/foo/foo", "bar", false, expire, 1, 1) b, err := s.Save() cloneFs := newStore() @@ -522,7 +522,7 @@ func GenKeys(num int, depth int) []string { } func createAndGet(s *store, path string, t *testing.T) { - _, err := s.Create(path, "bar", false, false, Permanent, 1, 1) + _, err := s.Create(path, "bar", false, Permanent, 1, 1) if err != nil { t.Fatalf("cannot create %s=bar [%s]", path, err.Error()) From 01bbad31c773a51c2433d727ddbc47d08c346f29 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 14 Oct 2013 23:16:48 -0700 Subject: [PATCH 09/12] refactor remove web pacakge --- web/conn.go | 30 ------------------- web/file2gostring.sh | 28 ------------------ web/hub.go | 61 -------------------------------------- web/index.go | 5 ---- web/index.html | 70 -------------------------------------------- web/web.go | 50 ------------------------------- 6 files changed, 244 deletions(-) delete mode 100644 web/conn.go delete mode 100755 web/file2gostring.sh delete mode 100644 web/hub.go delete mode 100644 web/index.go delete mode 100644 web/index.html delete mode 100644 web/web.go diff --git a/web/conn.go b/web/conn.go deleted file mode 100644 index 25e871635..000000000 --- a/web/conn.go +++ /dev/null @@ -1,30 +0,0 @@ -package web - -import ( - "code.google.com/p/go.net/websocket" -) - -type connection struct { - // The websocket connection. - ws *websocket.Conn - - // Buffered channel of outbound messages. - send chan string -} - -func (c *connection) writer() { - for message := range c.send { - err := websocket.Message.Send(c.ws, message) - if err != nil { - break - } - } - c.ws.Close() -} - -func wsHandler(ws *websocket.Conn) { - c := &connection{send: make(chan string, 256), ws: ws} - h.register <- c - defer func() { h.unregister <- c }() - c.writer() -} diff --git a/web/file2gostring.sh b/web/file2gostring.sh deleted file mode 100755 index 483b5fd31..000000000 --- a/web/file2gostring.sh +++ /dev/null @@ -1,28 +0,0 @@ -#!/bin/sh - -# this file is copied from doozerd. - -set -e - -munge() { - printf %s "$1" | tr . _ | tr -d -c '[:alnum:]_' -} - -quote() { - sed 's/\\/\\\\/g' | sed 's/"/\\"/g' | sed 's/$/\\n/' | tr -d '\n' -} - -pkg_path=$1 ; shift -file=$1 ; shift - -pkg=`basename $pkg_path` - -printf 'package %s\n' "$pkg" -printf '\n' -printf '// This file was generated from %s.\n' "$file" -printf '\n' -printf 'var ' -munge "`basename $file`" -printf ' string = "' -quote -printf '"\n' \ No newline at end of file diff --git a/web/hub.go b/web/hub.go deleted file mode 100644 index 47f203f72..000000000 --- a/web/hub.go +++ /dev/null @@ -1,61 +0,0 @@ -package web - -type hub struct { - // status - open bool - - // Registered connections. - connections map[*connection]bool - - // Inbound messages from the connections. - broadcast chan string - - // Register requests from the connections. - register chan *connection - - // Unregister requests from connections. - unregister chan *connection -} - -var h = hub{ - open: false, - broadcast: make(chan string), - register: make(chan *connection), - unregister: make(chan *connection), - connections: make(map[*connection]bool), -} - -func Hub() *hub { - return &h -} - -func HubOpen() bool { - return h.open -} - -func (h *hub) run() { - h.open = true - for { - select { - case c := <-h.register: - h.connections[c] = true - case c := <-h.unregister: - delete(h.connections, c) - close(c.send) - case m := <-h.broadcast: - for c := range h.connections { - select { - case c.send <- m: - default: - delete(h.connections, c) - close(c.send) - go c.ws.Close() - } - } - } - } -} - -func (h *hub) Send(msg string) { - h.broadcast <- msg -} diff --git a/web/index.go b/web/index.go deleted file mode 100644 index 2e30b6d77..000000000 --- a/web/index.go +++ /dev/null @@ -1,5 +0,0 @@ -package web - -// This file was generated from index.html. - -var index_html string = "\n\netcd Web Interface\n\n\n\n\n
Leader: {{.Leader}}
\n
\n\n\n" diff --git a/web/index.html b/web/index.html deleted file mode 100644 index 919bc98b2..000000000 --- a/web/index.html +++ /dev/null @@ -1,70 +0,0 @@ - - -etcd Web Interface - - - - -
Leader: {{.Leader}}
-
- - diff --git a/web/web.go b/web/web.go deleted file mode 100644 index 723eb05c8..000000000 --- a/web/web.go +++ /dev/null @@ -1,50 +0,0 @@ -package web - -import ( - "code.google.com/p/go.net/websocket" - "fmt" - "github.com/coreos/go-raft" - "html/template" - "net/http" - "net/url" -) - -var mainTempl *template.Template -var mainPage *MainPage - -type MainPage struct { - Leader string - Address string -} - -func mainHandler(c http.ResponseWriter, req *http.Request) { - p := mainPage - - mainTempl.Execute(c, p) -} - -func Start(raftServer raft.Server, webURL string) { - u, _ := url.Parse(webURL) - - webMux := http.NewServeMux() - - server := &http.Server{ - Handler: webMux, - Addr: u.Host, - } - - mainPage = &MainPage{ - Leader: raftServer.Leader(), - Address: u.Host, - } - - mainTempl = template.Must(template.New("index.html").Parse(index_html)) - - go h.run() - webMux.HandleFunc("/", mainHandler) - webMux.Handle("/ws", websocket.Handler(wsHandler)) - - fmt.Printf("etcd web server [%s] listening on %s\n", raftServer.Name(), u) - - server.ListenAndServe() -} From baa683b48443fd1fc4012aa3ef3370a4a6a37687 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 15 Oct 2013 22:21:55 -0700 Subject: [PATCH 10/12] feat POST-create unique node under given path --- server/v2/post_handler.go | 8 ++++---- store/create_command.go | 10 +++++----- store/store.go | 10 +++++----- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/server/v2/post_handler.go b/server/v2/post_handler.go index dda146e5e..b55eddba4 100644 --- a/server/v2/post_handler.go +++ b/server/v2/post_handler.go @@ -19,10 +19,10 @@ func PostHandler(w http.ResponseWriter, req *http.Request, s Server) error { } c := &store.CreateCommand{ - Key: key, - Value: value, - ExpireTime: expireTime, - IncrementalSuffix: (req.FormValue("incremental") == "true"), + Key: key, + Value: value, + ExpireTime: expireTime, + Unique: true, } return s.Dispatch(c, w, req) diff --git a/store/create_command.go b/store/create_command.go index b9f1aced5..6a2487cf0 100644 --- a/store/create_command.go +++ b/store/create_command.go @@ -12,10 +12,10 @@ func init() { // Create command type CreateCommand struct { - Key string `json:"key"` - Value string `json:"value"` - ExpireTime time.Time `json:"expireTime"` - IncrementalSuffix bool `json:"incrementalSuffix"` + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` + Unique bool `json:"unique"` } // The name of the create command in the log @@ -27,7 +27,7 @@ func (c *CreateCommand) CommandName() string { func (c *CreateCommand) Apply(server raft.Server) (interface{}, error) { s, _ := server.StateMachine().(Store) - e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.ExpireTime, server.CommitIndex(), server.Term()) + e, err := s.Create(c.Key, c.Value, c.Unique, c.ExpireTime, server.CommitIndex(), server.Term()) if err != nil { log.Debug(err) diff --git a/store/store.go b/store/store.go index 8cecfb178..73d56bee5 100644 --- a/store/store.go +++ b/store/store.go @@ -107,13 +107,13 @@ func (s *store) Get(nodePath string, recursive, sorted bool, index uint64, term // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl. // If the node has already existed, create will fail. // If any node on the path is a file, create will fail. -func (s *store) Create(nodePath string, value string, incrementalSuffix bool, +func (s *store) Create(nodePath string, value string, unique bool, expireTime time.Time, index uint64, term uint64) (*Event, error) { nodePath = path.Clean(path.Join("/", nodePath)) s.worldLock.Lock() defer s.worldLock.Unlock() - return s.internalCreate(nodePath, value, incrementalSuffix, false, expireTime, index, term, Create) + return s.internalCreate(nodePath, value, unique, false, expireTime, index, term, Create) } // Set function creates or replace the Node at nodePath. @@ -302,13 +302,13 @@ func (s *store) update(nodePath string, newValue string, expireTime time.Time, i return e, nil } -func (s *store) internalCreate(nodePath string, value string, incrementalSuffix bool, replace bool, +func (s *store) internalCreate(nodePath string, value string, unique bool, replace bool, expireTime time.Time, index uint64, term uint64, action string) (*Event, error) { s.Index, s.Term = index, term - if incrementalSuffix { // append unique incremental suffix to the node path - nodePath += "_" + strconv.FormatUint(index, 10) + if unique { // append unique item under the node path + nodePath += "/" + strconv.FormatUint(index, 10) } nodePath = path.Clean(path.Join("/", nodePath)) From 0392c187945888e9d29d45e21293d1f85b11008f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 15 Oct 2013 23:18:03 -0700 Subject: [PATCH 11/12] refactor put_handler.go --- server/v2/put_handler.go | 47 ++++++++++++++++++++++++++++++---------- store/stats.go | 36 +++++++++++++++++++----------- store/stats_test.go | 26 +++++++++++----------- store/store.go | 36 ++++++++++++++++++------------ store/store_test.go | 10 ++++----- store/update_command.go | 37 +++++++++++++++++++++++++++++++ 6 files changed, 135 insertions(+), 57 deletions(-) create mode 100644 store/update_command.go diff --git a/server/v2/put_handler.go b/server/v2/put_handler.go index a0580f840..5dcf061c2 100644 --- a/server/v2/put_handler.go +++ b/server/v2/put_handler.go @@ -3,6 +3,7 @@ package v2 import ( "net/http" "strconv" + "time" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/store" @@ -33,14 +34,9 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error { var c raft.Command - // Set command: create a new node or replace the old one. + // Set handler: create a new node or replace the old one. if !valueOk && !indexOk && !existOk { - c = &store.SetCommand{ - Key: key, - Value: value, - ExpireTime: expireTime, - } - return s.Dispatch(c, w, req) + return SetHandler(w, req, s, key, value, expireTime) } // update with test @@ -48,11 +44,11 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error { if prevExist[0] == "false" { // Create command: create a new node. Fail, if a node already exists // Ignore prevIndex and prevValue - c = &store.CreateCommand{ - Key: key, - Value: value, - ExpireTime: expireTime, - } + return CreateHandler(w, req, s, key, value, expireTime) + } + + if prevExist[0] == "true" && !indexOk && !valueOk { + return UpdateHandler(w, req, s, key, value, expireTime) } } @@ -84,3 +80,30 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error { return s.Dispatch(c, w, req) } + +func SetHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error { + c := &store.SetCommand{ + Key: key, + Value: value, + ExpireTime: expireTime, + } + return s.Dispatch(c, w, req) +} + +func CreateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error { + c := &store.CreateCommand{ + Key: key, + Value: value, + ExpireTime: expireTime, + } + return s.Dispatch(c, w, req) +} + +func UpdateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error { + c := &store.UpdateCommand{ + Key: key, + Value: value, + ExpireTime: expireTime, + } + return s.Dispatch(c, w, req) +} diff --git a/store/stats.go b/store/stats.go index 4c89b93e3..5f4d26d91 100644 --- a/store/stats.go +++ b/store/stats.go @@ -6,17 +6,19 @@ import ( ) const ( - SetSuccess = 100 - SetFail = 101 - DeleteSuccess = 102 - DeleteFail = 103 - UpdateSuccess = 104 - UpdateFail = 105 - CompareAndSwapSuccess = 106 - CompareAndSwapFail = 107 - GetSuccess = 110 - GetFail = 111 - ExpireCount = 112 + SetSuccess = iota + SetFail + DeleteSuccess + DeleteFail + CreateSuccess + CreateFail + UpdateSuccess + UpdateFail + CompareAndSwapSuccess + CompareAndSwapFail + GetSuccess + GetFail + ExpireCount ) type Stats struct { @@ -37,6 +39,10 @@ type Stats struct { UpdateSuccess uint64 `json:"updateSuccess"` UpdateFail uint64 `json:"updateFail"` + // Number of create requests + CreateSuccess uint64 `json:"createSuccess"` + CreateFail uint64 `json:createFail` + // Number of testAndSet requests CompareAndSwapSuccess uint64 `json:"compareAndSwapSuccess"` CompareAndSwapFail uint64 `json:"compareAndSwapFail"` @@ -53,8 +59,8 @@ func newStats() *Stats { func (s *Stats) clone() *Stats { return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail, - s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail, - s.CompareAndSwapSuccess, s.CompareAndSwapFail, s.Watchers, s.ExpireCount} + s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail, s.CreateSuccess, + s.CreateFail, s.CompareAndSwapSuccess, s.CompareAndSwapFail, s.Watchers, s.ExpireCount} } // Status() return the statistics info of etcd storage its recent start @@ -80,6 +86,10 @@ func (s *Stats) Inc(field int) { atomic.AddUint64(&s.SetSuccess, 1) case SetFail: atomic.AddUint64(&s.SetFail, 1) + case CreateSuccess: + atomic.AddUint64(&s.CreateSuccess, 1) + case CreateFail: + atomic.AddUint64(&s.CreateFail, 1) case DeleteSuccess: atomic.AddUint64(&s.DeleteSuccess, 1) case DeleteFail: diff --git a/store/stats_test.go b/store/stats_test.go index 6f3b34242..41c032ea5 100644 --- a/store/stats_test.go +++ b/store/stats_test.go @@ -11,16 +11,16 @@ func TestBasicStats(t *testing.T) { keys := GenKeys(rand.Intn(100), 5) var i uint64 - var GetSuccess, GetFail, SetSuccess, SetFail, DeleteSuccess, DeleteFail uint64 + var GetSuccess, GetFail, CreateSuccess, CreateFail, DeleteSuccess, DeleteFail uint64 var UpdateSuccess, UpdateFail, CompareAndSwapSuccess, CompareAndSwapFail, watcher_number uint64 for _, k := range keys { i++ _, err := s.Create(k, "bar", false, time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) if err != nil { - SetFail++ + CreateFail++ } else { - SetSuccess++ + CreateSuccess++ } } @@ -37,7 +37,7 @@ func TestBasicStats(t *testing.T) { for _, k := range keys { i++ - _, err := s.update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) + _, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) if err != nil { UpdateFail++ } else { @@ -108,12 +108,12 @@ func TestBasicStats(t *testing.T) { t.Fatalf("GetFail [%d] != Stats.GetFail [%d]", GetFail, s.Stats.GetFail) } - if SetSuccess != s.Stats.SetSuccess { - t.Fatalf("SetSuccess [%d] != Stats.SetSuccess [%d]", SetSuccess, s.Stats.SetSuccess) + if CreateSuccess != s.Stats.CreateSuccess { + t.Fatalf("CreateSuccess [%d] != Stats.CreateSuccess [%d]", CreateSuccess, s.Stats.CreateSuccess) } - if SetFail != s.Stats.SetFail { - t.Fatalf("SetFail [%d] != Stats.SetFail [%d]", SetFail, s.Stats.SetFail) + if CreateFail != s.Stats.CreateFail { + t.Fatalf("CreateFail [%d] != Stats.CreateFail [%d]", CreateFail, s.Stats.CreateFail) } if DeleteSuccess != s.Stats.DeleteSuccess { @@ -141,22 +141,22 @@ func TestBasicStats(t *testing.T) { } s = newStore() - SetSuccess = 0 - SetFail = 0 + CreateSuccess = 0 + CreateFail = 0 for _, k := range keys { i++ _, err := s.Create(k, "bar", false, time.Now().Add(time.Second*3), i, 1) if err != nil { - SetFail++ + CreateFail++ } else { - SetSuccess++ + CreateSuccess++ } } time.Sleep(6 * time.Second) - ExpireCount := SetSuccess + ExpireCount := CreateSuccess if ExpireCount != s.Stats.ExpireCount { t.Fatalf("ExpireCount [%d] != Stats.ExpireCount [%d]", ExpireCount, s.Stats.ExpireCount) diff --git a/store/store.go b/store/store.go index 73d56bee5..008ac26d5 100644 --- a/store/store.go +++ b/store/store.go @@ -16,6 +16,7 @@ import ( type Store interface { Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) + Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) Create(nodePath string, value string, incrementalSuffix bool, expireTime time.Time, index uint64, term uint64) (*Event, error) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, @@ -113,7 +114,15 @@ func (s *store) Create(nodePath string, value string, unique bool, s.worldLock.Lock() defer s.worldLock.Unlock() - return s.internalCreate(nodePath, value, unique, false, expireTime, index, term, Create) + e, err := s.internalCreate(nodePath, value, unique, false, expireTime, index, term, Create) + + if err == nil { + s.Stats.Inc(CreateSuccess) + } else { + s.Stats.Inc(CreateFail) + } + + return e, err } // Set function creates or replace the Node at nodePath. @@ -122,7 +131,15 @@ func (s *store) Set(nodePath string, value string, expireTime time.Time, index u s.worldLock.Lock() defer s.worldLock.Unlock() - return s.internalCreate(nodePath, value, false, true, expireTime, index, term, Set) + e, err := s.internalCreate(nodePath, value, false, true, expireTime, index, term, Set) + + if err == nil { + s.Stats.Inc(SetSuccess) + } else { + s.Stats.Inc(SetFail) + } + + return e, err } func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, @@ -133,10 +150,6 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint s.worldLock.Lock() defer s.worldLock.Unlock() - if prevValue == "" && prevIndex == 0 { // try just update - return s.update(nodePath, value, expireTime, index, term) - } - n, err := s.internalGet(nodePath, index, term) if err != nil { @@ -265,7 +278,7 @@ func (s *store) walk(nodePath string, walkFunc func(prev *Node, component string // Update function updates the value/ttl of the node. // If the node is a file, the value and the ttl can be updated. // If the node is a directory, only the ttl can be updated. -func (s *store) update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) { +func (s *store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) { s.worldLock.Lock() defer s.worldLock.Unlock() nodePath = path.Clean(path.Join("/", nodePath)) @@ -354,12 +367,8 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla } - err = d.Add(n) - - if err != nil { - s.Stats.Inc(SetFail) - return nil, err - } + // we are sure d is a directory and does not have the children with name n.Name + d.Add(n) // Node with TTL if expireTime.Sub(Permanent) != 0 { @@ -368,7 +377,6 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla } s.WatcherHub.notify(e) - s.Stats.Inc(SetSuccess) return e, nil } diff --git a/store/store_test.go b/store/store_test.go index dd6a2f818..20b1131c3 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -74,7 +74,7 @@ func TestUpdateFile(t *testing.T) { t.Fatalf("cannot create %s=bar [%s]", "/foo/bar", err.Error()) } - _, err = s.update("/foo/bar", "barbar", Permanent, 2, 1) + _, err = s.Update("/foo/bar", "barbar", Permanent, 2, 1) if err != nil { t.Fatalf("cannot update %s=barbar [%s]", "/foo/bar", err.Error()) @@ -114,7 +114,7 @@ func TestUpdateFile(t *testing.T) { } expire := time.Now().Add(time.Second * 2) - _, err = s.update("/foo/foo", "", expire, 7, 1) + _, err = s.Update("/foo/foo", "", expire, 7, 1) if err != nil { t.Fatalf("cannot update dir [%s] [%s]", "/foo/foo", err.Error()) } @@ -331,7 +331,7 @@ func TestWatch(t *testing.T) { } c, _ = s.Watch("/foo/foo/foo", false, 0, 1, 1) - s.update("/foo/foo/foo", "car", Permanent, 2, 1) + s.Update("/foo/foo/foo", "car", Permanent, 2, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/foo" || e.Action != Update { t.Fatal("watch for Update node fails ", e) @@ -360,7 +360,7 @@ func TestWatch(t *testing.T) { } c, _ = s.Watch("/foo", true, 0, 5, 1) - s.update("/foo/foo/boo", "foo", Permanent, 6, 1) + s.Update("/foo/foo/boo", "foo", Permanent, 6, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/boo" || e.Action != Update { t.Fatal("watch for Update subdirectory fails") @@ -390,7 +390,7 @@ func TestWatch(t *testing.T) { } s.Create("/foo/foo/boo", "foo", false, Permanent, 10, 1) - s.update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1) + s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1) c, _ = s.Watch("/foo", true, 0, 11, 1) time.Sleep(time.Second * 2) e = nonblockingRetrive(c) diff --git a/store/update_command.go b/store/update_command.go new file mode 100644 index 000000000..8e353cdd0 --- /dev/null +++ b/store/update_command.go @@ -0,0 +1,37 @@ +package store + +import ( + "github.com/coreos/etcd/log" + "github.com/coreos/go-raft" + "time" +) + +func init() { + raft.RegisterCommand(&UpdateCommand{}) +} + +// Update command +type UpdateCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` +} + +// The name of the update command in the log +func (c *UpdateCommand) CommandName() string { + return "etcd:update" +} + +// Create node +func (c *UpdateCommand) Apply(server raft.Server) (interface{}, error) { + s, _ := server.StateMachine().(Store) + + e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) + + if err != nil { + log.Debug(err) + return nil, err + } + + return e, nil +} From e680f28c2f02ce819f2d6e7f1a4f499aa14d8f86 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 15 Oct 2013 23:25:12 -0700 Subject: [PATCH 12/12] fix move update check to update handler --- server/v2/put_handler.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/v2/put_handler.go b/server/v2/put_handler.go index 5dcf061c2..336366d38 100644 --- a/server/v2/put_handler.go +++ b/server/v2/put_handler.go @@ -23,11 +23,6 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error { return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm) } - // Update should give at least one option - if value == "" && expireTime.Sub(store.Permanent) == 0 { - return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm) - } - prevValue, valueOk := req.Form["prevValue"] prevIndexStr, indexOk := req.Form["prevIndex"] prevExist, existOk := req.Form["prevExist"] @@ -100,6 +95,11 @@ func CreateHandler(w http.ResponseWriter, req *http.Request, s Server, key, valu } func UpdateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error { + // Update should give at least one option + if value == "" && expireTime.Sub(store.Permanent) == 0 { + return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm) + } + c := &store.UpdateCommand{ Key: key, Value: value,