From b8b81d5b03798835e8519acba3876c6416664ac6 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 3 Oct 2013 08:59:05 -0700 Subject: [PATCH] feat(store) create node with incremental suffix. accept #190 in new API --- command.go | 11 +++++---- etcd_handlers.go | 11 +++++++-- store/stats_test.go | 4 +-- store/store.go | 15 +++++++++--- store/store_test.go | 60 ++++++++++++++++++++++----------------------- 5 files changed, 58 insertions(+), 43 deletions(-) diff --git a/command.go b/command.go index bfda5f270..3f007f13a 100644 --- a/command.go +++ b/command.go @@ -27,9 +27,10 @@ type Command interface { // Create command type CreateCommand struct { - Key string `json:"key"` - Value string `json:"value"` - ExpireTime time.Time `json:"expireTime"` + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` + IncrementalSuffix bool `json:"incrementalSuffix"` } // The name of the create command in the log @@ -39,7 +40,7 @@ func (c *CreateCommand) CommandName() string { // Create node func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) { - e, err := etcdStore.Create(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) + e, err := etcdStore.Create(c.Key, c.Value, c.IncrementalSuffix, c.ExpireTime, server.CommitIndex(), server.Term()) if err != nil { debug(err) @@ -221,7 +222,7 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { // add machine in etcd storage key := path.Join("_etcd/machines", c.Name) value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion) - etcdStore.Create(key, value, store.Permanent, raftServer.CommitIndex(), raftServer.Term()) + etcdStore.Create(key, value, false, store.Permanent, raftServer.CommitIndex(), raftServer.Term()) // add peer stats if c.Name != r.Name() { diff --git a/etcd_handlers.go b/etcd_handlers.go index 4e802a5a0..5681cf6dd 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -75,6 +75,7 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusMethodNotAllowed) return nil } + return nil } @@ -102,6 +103,10 @@ func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error { ExpireTime: expireTime, } + if req.FormValue("incremental") == "true" { + command.IncrementalSuffix = true + } + return dispatchEtcdCommand(command, w, req) } @@ -201,6 +206,7 @@ func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusOK) raftURL, _ := nameToRaftURL(leader) w.Write([]byte(raftURL)) + return nil } else { return etcdErr.NewError(etcdErr.EcodeLeaderElect, "") @@ -213,6 +219,7 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusOK) w.Write([]byte(strings.Join(machines, ", "))) + return nil } @@ -220,6 +227,7 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error { func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "etcd %s", releaseVersion) + return nil } @@ -277,7 +285,6 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) error { } indexStr := req.FormValue("wait_index") - if indexStr != "" { sinceIndex, err := strconv.ParseUint(indexStr, 10, 64) @@ -297,7 +304,6 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) error { } sorted := req.FormValue("sorted") - if sorted == "true" { command.Sorted = true } @@ -330,6 +336,7 @@ func TestHttpHandler(w http.ResponseWriter, req *http.Request) { directSet() w.WriteHeader(http.StatusOK) w.Write([]byte("speed test success")) + return } diff --git a/store/stats_test.go b/store/stats_test.go index 207df825f..80f71ff88 100644 --- a/store/stats_test.go +++ b/store/stats_test.go @@ -16,7 +16,7 @@ func TestBasicStats(t *testing.T) { for _, k := range keys { i++ - _, err := s.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) + _, err := s.Create(k, "bar", false, time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) if err != nil { SetFail++ } else { @@ -146,7 +146,7 @@ func TestBasicStats(t *testing.T) { for _, k := range keys { i++ - _, err := s.Create(k, "bar", time.Now().Add(time.Second*3), i, 1) + _, err := s.Create(k, "bar", false, time.Now().Add(time.Second*3), i, 1) if err != nil { SetFail++ } else { diff --git a/store/store.go b/store/store.go index 0f6a425f7..0113e8b11 100644 --- a/store/store.go +++ b/store/store.go @@ -5,6 +5,7 @@ import ( "fmt" "path" "sort" + "strconv" "strings" "sync" "time" @@ -94,10 +95,12 @@ func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl. // If the node has already existed, create will fail. // If any node on the path is a file, create will fail. -func (s *Store) Create(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { +func (s *Store) Create(nodePath string, value string, incrementalSuffix bool, + expireTime time.Time, index uint64, term uint64) (*Event, error) { + s.worldLock.Lock() defer s.worldLock.Unlock() - return s.internalCreate(nodePath, value, expireTime, index, term, Create) + return s.internalCreate(nodePath, value, incrementalSuffix, expireTime, index, term, Create) } // Update function updates the value/ttl of the node. @@ -155,7 +158,7 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64, defer s.worldLock.Unlock() if prevValue == "" && prevIndex == 0 { // try create - return s.internalCreate(nodePath, value, expireTime, index, term, TestAndSet) + return s.internalCreate(nodePath, value, false, expireTime, index, term, TestAndSet) } n, err := s.internalGet(nodePath, index, term) @@ -262,7 +265,11 @@ func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string return curr, nil } -func (s *Store) internalCreate(nodePath string, value string, expireTime time.Time, index uint64, term uint64, action string) (*Event, error) { +func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix bool, expireTime time.Time, index uint64, term uint64, action string) (*Event, error) { + if incrementalSuffix { // append unique incremental suffix to the node path + nodePath += "_" + strconv.FormatUint(index, 10) + } + nodePath = path.Clean(path.Join("/", nodePath)) // make sure we can create the node diff --git a/store/store_test.go b/store/store_test.go index 37dba4f14..5a672d9b0 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -10,10 +10,10 @@ import ( func TestCreateAndGet(t *testing.T) { s := New() - s.Create("/foobar", "bar", Permanent, 1, 1) + s.Create("/foobar", "bar", false, Permanent, 1, 1) // already exist, create should fail - _, err := s.Create("/foobar", "bar", Permanent, 1, 1) + _, err := s.Create("/foobar", "bar", false, Permanent, 1, 1) if err == nil { t.Fatal("Create should fail") @@ -27,14 +27,14 @@ func TestCreateAndGet(t *testing.T) { createAndGet(s, "/foo/foo/bar", t) // meet file, create should fail - _, err = s.Create("/foo/bar/bar", "bar", Permanent, 2, 1) + _, err = s.Create("/foo/bar/bar", "bar", false, Permanent, 2, 1) if err == nil { t.Fatal("Create should fail") } // create a directory - _, err = s.Create("/fooDir", "", Permanent, 3, 1) + _, err = s.Create("/fooDir", "", false, Permanent, 3, 1) if err != nil { t.Fatal("Cannot create /fooDir") @@ -47,7 +47,7 @@ func TestCreateAndGet(t *testing.T) { } // create a file under directory - _, err = s.Create("/fooDir/bar", "bar", Permanent, 4, 1) + _, err = s.Create("/fooDir/bar", "bar", false, Permanent, 4, 1) if err != nil { t.Fatal("Cannot create /fooDir/bar = bar") @@ -57,7 +57,7 @@ func TestCreateAndGet(t *testing.T) { func TestUpdateFile(t *testing.T) { s := New() - _, err := s.Create("/foo/bar", "bar", Permanent, 1, 1) + _, err := s.Create("/foo/bar", "bar", false, Permanent, 1, 1) if err != nil { t.Fatalf("cannot create %s=bar [%s]", "/foo/bar", err.Error()) @@ -80,24 +80,24 @@ func TestUpdateFile(t *testing.T) { } // create a directory, update its ttl, to see if it will be deleted - _, err = s.Create("/foo/foo", "", Permanent, 3, 1) + _, err = s.Create("/foo/foo", "", false, Permanent, 3, 1) if err != nil { t.Fatalf("cannot create dir [%s] [%s]", "/foo/foo", err.Error()) } - _, err = s.Create("/foo/foo/foo1", "bar1", Permanent, 4, 1) + _, err = s.Create("/foo/foo/foo1", "bar1", false, Permanent, 4, 1) if err != nil { t.Fatal("cannot create [%s]", err.Error()) } - _, err = s.Create("/foo/foo/foo2", "", Permanent, 5, 1) + _, err = s.Create("/foo/foo/foo2", "", false, Permanent, 5, 1) if err != nil { t.Fatal("cannot create [%s]", err.Error()) } - _, err = s.Create("/foo/foo/foo2/boo", "boo1", Permanent, 6, 1) + _, err = s.Create("/foo/foo/foo2/boo", "boo1", false, Permanent, 6, 1) if err != nil { t.Fatal("cannot create [%s]", err.Error()) } @@ -158,11 +158,11 @@ func TestListDirectory(t *testing.T) { // create dir /foo // set key-value /foo/foo=bar - s.Create("/foo/foo", "bar", Permanent, 1, 1) + s.Create("/foo/foo", "bar", false, Permanent, 1, 1) // create dir /foo/fooDir // set key-value /foo/fooDir/foo=bar - s.Create("/foo/fooDir/foo", "bar", Permanent, 2, 1) + s.Create("/foo/fooDir/foo", "bar", false, Permanent, 2, 1) e, err := s.Get("/foo", true, false, 2, 1) @@ -189,7 +189,7 @@ func TestListDirectory(t *testing.T) { // create dir /foo/_hidden // set key-value /foo/_hidden/foo -> bar - s.Create("/foo/_hidden/foo", "bar", Permanent, 3, 1) + s.Create("/foo/_hidden/foo", "bar", false, Permanent, 3, 1) e, _ = s.Get("/foo", false, false, 2, 1) @@ -201,7 +201,7 @@ func TestListDirectory(t *testing.T) { func TestRemove(t *testing.T) { s := New() - s.Create("/foo", "bar", Permanent, 1, 1) + s.Create("/foo", "bar", false, Permanent, 1, 1) _, err := s.Delete("/foo", false, 1, 1) if err != nil { @@ -214,9 +214,9 @@ func TestRemove(t *testing.T) { t.Fatalf("can get the node after deletion") } - s.Create("/foo/bar", "bar", Permanent, 1, 1) - s.Create("/foo/car", "car", Permanent, 1, 1) - s.Create("/foo/dar/dar", "dar", Permanent, 1, 1) + s.Create("/foo/bar", "bar", false, Permanent, 1, 1) + s.Create("/foo/car", "car", false, Permanent, 1, 1) + s.Create("/foo/dar/dar", "dar", false, Permanent, 1, 1) _, err = s.Delete("/foo", false, 1, 1) @@ -242,7 +242,7 @@ func TestExpire(t *testing.T) { expire := time.Now().Add(time.Second) - s.Create("/foo", "bar", expire, 1, 1) + s.Create("/foo", "bar", false, expire, 1, 1) _, err := s.Get("/foo", false, false, 1, 1) @@ -260,7 +260,7 @@ func TestExpire(t *testing.T) { // test if we can reach the node before expiration expire = time.Now().Add(time.Second) - s.Create("/foo", "bar", expire, 1, 1) + s.Create("/foo", "bar", false, expire, 1, 1) time.Sleep(time.Millisecond * 50) _, err = s.Get("/foo", false, false, 1, 1) @@ -271,7 +271,7 @@ func TestExpire(t *testing.T) { expire = time.Now().Add(time.Second) - s.Create("/foo", "bar", expire, 1, 1) + s.Create("/foo", "bar", false, expire, 1, 1) _, err = s.Delete("/foo", false, 1, 1) if err != nil { @@ -281,7 +281,7 @@ func TestExpire(t *testing.T) { func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? s := New() - s.Create("/foo", "bar", Permanent, 1, 1) + s.Create("/foo", "bar", false, Permanent, 1, 1) // test on wrong previous value _, err := s.TestAndSet("/foo", "barbar", 0, "car", Permanent, 2, 1) @@ -316,7 +316,7 @@ func TestWatch(t *testing.T) { s := New() // watch at a deeper path c, _ := s.Watch("/foo/foo/foo", false, 0, 0, 1) - s.Create("/foo/foo/foo", "bar", Permanent, 1, 1) + s.Create("/foo/foo/foo", "bar", false, Permanent, 1, 1) e := nonblockingRetrive(c) if e.Key != "/foo/foo/foo" || e.Action != Create { @@ -346,7 +346,7 @@ func TestWatch(t *testing.T) { // watch at a prefix c, _ = s.Watch("/foo", true, 0, 4, 1) - s.Create("/foo/foo/boo", "bar", Permanent, 5, 1) + s.Create("/foo/foo/boo", "bar", false, Permanent, 5, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/boo" || e.Action != Create { t.Fatal("watch for Create subdirectory fails") @@ -374,7 +374,7 @@ func TestWatch(t *testing.T) { } // watch expire - s.Create("/foo/foo/boo", "foo", time.Now().Add(time.Second*1), 9, 1) + s.Create("/foo/foo/boo", "foo", false, time.Now().Add(time.Second*1), 9, 1) c, _ = s.Watch("/foo", true, 0, 9, 1) time.Sleep(time.Second * 2) e = nonblockingRetrive(c) @@ -382,7 +382,7 @@ func TestWatch(t *testing.T) { t.Fatal("watch for Expiration of Create() subdirectory fails ", e) } - s.Create("/foo/foo/boo", "foo", Permanent, 10, 1) + s.Create("/foo/foo/boo", "foo", false, Permanent, 10, 1) s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1) c, _ = s.Watch("/foo", true, 0, 11, 1) time.Sleep(time.Second * 2) @@ -391,7 +391,7 @@ func TestWatch(t *testing.T) { t.Fatal("watch for Expiration of Update() subdirectory fails ", e) } - s.Create("/foo/foo/boo", "foo", Permanent, 12, 1) + s.Create("/foo/foo/boo", "foo", false, Permanent, 12, 1) s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1) c, _ = s.Watch("/foo", true, 0, 13, 1) time.Sleep(time.Second * 2) @@ -409,7 +409,7 @@ func TestSort(t *testing.T) { i := uint64(1) for _, k := range keys { - _, err := s.Create(k, "bar", Permanent, i, 1) + _, err := s.Create(k, "bar", false, Permanent, i, 1) if err != nil { panic(err) } else { @@ -447,7 +447,7 @@ func TestSaveAndRecover(t *testing.T) { i := uint64(1) for _, k := range keys { - _, err := s.Create(k, "bar", Permanent, i, 1) + _, err := s.Create(k, "bar", false, Permanent, i, 1) if err != nil { panic(err) } else { @@ -459,7 +459,7 @@ func TestSaveAndRecover(t *testing.T) { // test if we can reach the node before expiration expire := time.Now().Add(time.Second) - s.Create("/foo/foo", "bar", expire, 1, 1) + s.Create("/foo/foo", "bar", false, expire, 1, 1) b, err := s.Save() cloneFs := New() @@ -514,7 +514,7 @@ func GenKeys(num int, depth int) []string { } func createAndGet(s *Store, path string, t *testing.T) { - _, err := s.Create(path, "bar", Permanent, 1, 1) + _, err := s.Create(path, "bar", false, Permanent, 1, 1) if err != nil { t.Fatalf("cannot create %s=bar [%s]", path, err.Error())