From 3ff100321c98fbdaf0001b804f102b82c842a24b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 14 Sep 2013 15:13:33 -0400 Subject: [PATCH] use new store system --- command.go | 110 ++++++++++++++--- etcd.go | 9 +- etcd_handlers.go | 210 +++++++++++++++++++------------- etcd_test.go | 5 +- file_system/file_system.go | 31 +++-- file_system/file_system_test.go | 12 +- raft_handlers.go | 3 +- raft_server.go | 3 + util.go | 8 +- 9 files changed, 266 insertions(+), 125 deletions(-) diff --git a/command.go b/command.go index d03b4dc2e..0a7cc1c97 100644 --- a/command.go +++ b/command.go @@ -4,12 +4,12 @@ import ( "encoding/binary" "encoding/json" "fmt" - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/store" - "github.com/coreos/go-raft" "os" "path" "time" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/go-raft" ) const commandPrefix = "etcd:" @@ -24,6 +24,54 @@ type Command interface { Apply(server *raft.Server) (interface{}, error) } +// Create command +type CreateCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` +} + +// The name of the create command in the log +func (c *CreateCommand) CommandName() string { + return commandName("create") +} + +// Create node +func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) { + e, err := etcdFs.Create(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) + + if err != nil { + debug(err) + return nil, err + } + + return json.Marshal(e) +} + +// Update command +type UpdateCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` +} + +// The name of the update command in the log +func (c *UpdateCommand) CommandName() string { + return commandName("update") +} + +// Update node +func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) { + e, err := etcdFs.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) + + if err != nil { + debug(err) + return nil, err + } + + return json.Marshal(e) +} + // Set command type SetCommand struct { Key string `json:"key"` @@ -45,8 +93,9 @@ func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) { type TestAndSetCommand struct { Key string `json:"key"` Value string `json:"value"` - PrevValue string `json: prevValue` ExpireTime time.Time `json:"expireTime"` + PrevValue string `json: prevValue` + PrevIndex uint64 `json: prevValue` } // The name of the testAndSet command in the log @@ -56,12 +105,22 @@ func (c *TestAndSetCommand) CommandName() string { // Set the key-value pair if the current value of the key equals to the given prevValue func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) { - return etcdStore.TestAndSet(c.Key, c.PrevValue, c.Value, c.ExpireTime, server.CommitIndex()) + e, err := etcdFs.TestAndSet(c.Key, c.PrevValue, c.PrevIndex, + c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) + + if err != nil { + debug(err) + return nil, err + } + + return json.Marshal(e) } // Get command type GetCommand struct { - Key string `json:"key"` + Key string `json:"key"` + Recursive bool `json:"recursive"` + Sorted bool `json:"sorted"` } // The name of the get command in the log @@ -71,12 +130,20 @@ func (c *GetCommand) CommandName() string { // Get the value of key func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) { - return etcdStore.Get(c.Key) + e, err := etcdFs.Get(c.Key, c.Recursive, c.Sorted, server.CommitIndex(), server.Term()) + + if err != nil { + debug(err) + return nil, err + } + + return json.Marshal(e) } // Delete command type DeleteCommand struct { - Key string `json:"key"` + Key string `json:"key"` + Recursive bool `json:"recursive"` } // The name of the delete command in the log @@ -86,13 +153,21 @@ func (c *DeleteCommand) CommandName() string { // Delete the key func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) { - return etcdStore.Delete(c.Key, server.CommitIndex()) + e, err := etcdFs.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term()) + + if err != nil { + debug(err) + return nil, err + } + + return json.Marshal(e) } // Watch command type WatchCommand struct { Key string `json:"key"` SinceIndex uint64 `json:"sinceIndex"` + Recursive bool `json:"recursive"` } // The name of the watch command in the log @@ -101,20 +176,15 @@ func (c *WatchCommand) CommandName() string { } func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { - // create a new watcher - watcher := store.NewWatcher() + eventChan, err := etcdFs.Watch(c.Key, c.Recursive, c.SinceIndex, server.CommitIndex(), server.Term()) - // add to the watchers list - etcdStore.AddWatcher(c.Key, watcher, c.SinceIndex) - - // wait for the notification for any changing - res := <-watcher.C - - if res == nil { - return nil, fmt.Errorf("Clearing watch") + if err != nil { + return nil, err } - return json.Marshal(res) + e := <-eventChan + + return json.Marshal(e) } // JoinCommand diff --git a/etcd.go b/etcd.go index 46546e8cc..bffb4e02e 100644 --- a/etcd.go +++ b/etcd.go @@ -3,12 +3,14 @@ package main import ( "crypto/tls" "flag" - "github.com/coreos/etcd/store" - "github.com/coreos/go-raft" "io/ioutil" "os" "strings" "time" + + "github.com/coreos/etcd/file_system" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" ) //------------------------------------------------------------------------------ @@ -129,6 +131,7 @@ type TLSConfig struct { //------------------------------------------------------------------------------ var etcdStore *store.Store +var etcdFs *fileSystem.FileSystem //------------------------------------------------------------------------------ // @@ -195,6 +198,8 @@ func main() { // Create etcd key-value store etcdStore = store.CreateStore(maxSize) + etcdFs = fileSystem.New() + snapConf = newSnapshotConf() // Create etcd and raft server diff --git a/etcd_handlers.go b/etcd_handlers.go index 6b5203ec4..e1c6a8b9e 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -2,12 +2,12 @@ package main import ( "fmt" - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/store" - "github.com/coreos/go-raft" "net/http" "strconv" "strings" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/go-raft" ) //------------------------------------------------------------------- @@ -18,7 +18,6 @@ func NewEtcdMuxer() *http.ServeMux { // external commands etcdMux := http.NewServeMux() etcdMux.Handle("/"+version+"/keys/", errorHandler(Multiplexer)) - etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler)) etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler)) etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler)) etcdMux.Handle("/"+version+"/stats", errorHandler(StatsHttpHandler)) @@ -47,15 +46,16 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) error { case "GET": return GetHttpHandler(w, req) case "POST": - return SetHttpHandler(w, req) + return CreateHttpHandler(w, req) case "PUT": - return SetHttpHandler(w, req) + return UpdateHttpHandler(w, req) case "DELETE": return DeleteHttpHandler(w, req) default: w.WriteHeader(http.StatusMethodNotAllowed) return nil } + return nil } //-------------------------------------- @@ -63,63 +63,102 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) error { // Set/Delete will dispatch to leader //-------------------------------------- -// Set Command Handler -func SetHttpHandler(w http.ResponseWriter, req *http.Request) error { - key := req.URL.Path[len("/v1/keys/"):] +func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error { + key := req.URL.Path[len("/v2/keys"):] - if store.CheckKeyword(key) { - return etcdErr.NewError(etcdErr.EcodeKeyIsPreserved, "Set") - } - - debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) + debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) value := req.FormValue("value") - if len(value) == 0 { - return etcdErr.NewError(etcdErr.EcodeValueRequired, "Set") + ttl := req.FormValue("ttl") + + expireTime, err := durationToExpireTime(ttl) + + if err != nil { + return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create") + } + + command := &CreateCommand{ + Key: key, + Value: value, + ExpireTime: expireTime, + } + + return dispatch(command, w, req, true) + +} + +func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error { + key := req.URL.Path[len("/v2/keys"):] + + debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) + + value := req.FormValue("value") + + ttl := req.FormValue("ttl") + + expireTime, err := durationToExpireTime(ttl) + + if err != nil { + return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update") + } + + // TODO: update should give at least one option + if value == "" && ttl == "" { + return nil } prevValue := req.FormValue("prevValue") - strDuration := req.FormValue("ttl") + prevIndexStr := req.FormValue("prevIndex") - expireTime, err := durationToExpireTime(strDuration) + if prevValue == "" && prevIndexStr == "" { // update without test + command := &UpdateCommand{ + Key: key, + Value: value, + ExpireTime: expireTime, + } - if err != nil { - return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Set") - } + return dispatch(command, w, req, true) + + } else { // update with test + var prevIndex uint64 + + if prevIndexStr != "" { + prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64) + } + + // TODO: add error type + if err != nil { + return nil + } - if len(prevValue) != 0 { command := &TestAndSetCommand{ - Key: key, - Value: value, - PrevValue: prevValue, - ExpireTime: expireTime, - } - - return dispatch(command, w, req, true) - - } else { - command := &SetCommand{ - Key: key, - Value: value, - ExpireTime: expireTime, + Key: key, + Value: value, + PrevValue: prevValue, + PrevIndex: prevIndex, } return dispatch(command, w, req, true) } + } // Delete Handler func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error { - key := req.URL.Path[len("/v1/keys/"):] + key := req.URL.Path[len("/v2/keys"):] - debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) + debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) command := &DeleteCommand{ Key: key, } + if req.FormValue("recursive") == "true" { + command.Recursive = true + } + return dispatch(command, w, req, true) } @@ -212,69 +251,68 @@ func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { return nil } -// Get Handler func GetHttpHandler(w http.ResponseWriter, req *http.Request) error { - key := req.URL.Path[len("/v1/keys/"):] + var err error + var event interface{} + key := req.URL.Path[len("/v1/keys"):] - debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) + debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) - command := &GetCommand{ - Key: key, + recursive := req.FormValue("recursive") + + if req.FormValue("wait") == "true" { + command := &WatchCommand{ + Key: key, + } + + if recursive == "true" { + command.Recursive = true + } + + indexStr := req.FormValue("wait_index") + + if indexStr != "" { + sinceIndex, err := strconv.ParseUint(indexStr, 10, 64) + + if err != nil { + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index") + } + + command.SinceIndex = sinceIndex + } + + event, err = command.Apply(r.Server) + + } else { + command := &GetCommand{ + Key: key, + } + + sorted := req.FormValue("sorted") + + if sorted == "true" { + command.Sorted = true + } + + if recursive == "true" { + command.Recursive = true + } + + event, err = command.Apply(r.Server) } - if body, err := command.Apply(r.Server); err != nil { + if err != nil { return err } else { - body, _ := body.([]byte) + event, _ := event.([]byte) w.WriteHeader(http.StatusOK) - w.Write(body) + w.Write(event) return nil } } -// Watch handler -func WatchHttpHandler(w http.ResponseWriter, req *http.Request) error { - key := req.URL.Path[len("/v1/watch/"):] - - command := &WatchCommand{ - Key: key, - } - - if req.Method == "GET" { - debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr) - command.SinceIndex = 0 - - } else if req.Method == "POST" { - // watch from a specific index - - debugf("[recv] POST %s/watch/%s [%s]", e.url, key, req.RemoteAddr) - content := req.FormValue("index") - - sinceIndex, err := strconv.ParseUint(string(content), 10, 64) - if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index") - } - command.SinceIndex = sinceIndex - - } else { - w.WriteHeader(http.StatusMethodNotAllowed) - return nil - } - - if body, err := command.Apply(r.Server); err != nil { - return etcdErr.NewError(etcdErr.EcodeWatcherCleared, key) - } else { - w.WriteHeader(http.StatusOK) - - body, _ := body.([]byte) - w.Write(body) - return nil - } - -} - // TestHandler func TestHttpHandler(w http.ResponseWriter, req *http.Request) { testType := req.URL.Path[len("/test/"):] diff --git a/etcd_test.go b/etcd_test.go index e61e7e4a8..caa6af84d 100644 --- a/etcd_test.go +++ b/etcd_test.go @@ -2,8 +2,6 @@ package main import ( "fmt" - "github.com/coreos/etcd/test" - "github.com/coreos/go-etcd/etcd" "math/rand" "net/http" "net/http/httptest" @@ -13,6 +11,9 @@ import ( "strings" "testing" "time" + + "github.com/coreos/etcd/test" + "github.com/coreos/go-etcd/etcd" ) // Create a single node and try to set value diff --git a/file_system/file_system.go b/file_system/file_system.go index 5d11f8f20..ee67f9655 100644 --- a/file_system/file_system.go +++ b/file_system/file_system.go @@ -27,6 +27,8 @@ func New() *FileSystem { } func (fs *FileSystem) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) { + nodePath = path.Clean(path.Join("/", nodePath)) + n, err := fs.InternalGet(nodePath, index, term) if err != nil { @@ -71,6 +73,11 @@ func (fs *FileSystem) Get(nodePath string, recursive, sorted bool, index uint64, e.Value = n.Value } + if n.ExpireTime.Sub(Permanent) != 0 { + e.Expiration = &n.ExpireTime + e.TTL = int64(n.ExpireTime.Sub(time.Now())/time.Second) + 1 + } + return e, nil } @@ -78,7 +85,7 @@ func (fs *FileSystem) Get(nodePath string, recursive, sorted bool, index uint64, // If the node has already existed, create will fail. // If any node on the path is a file, create will fail. func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { - nodePath = path.Clean("/" + nodePath) + nodePath = path.Clean(path.Join("/", nodePath)) // make sure we can create the node _, err := fs.InternalGet(nodePath, index, term) @@ -125,10 +132,10 @@ func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time } // Node with TTL - if expireTime != Permanent { + if expireTime.Sub(Permanent) != 0 { n.Expire() e.Expiration = &n.ExpireTime - e.TTL = int64(expireTime.Sub(time.Now()) / time.Second) + e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 } fs.WatcherHub.notify(e) @@ -164,7 +171,7 @@ func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time } // update ttl - if !n.IsPermanent() && expireTime != Permanent { + if !n.IsPermanent() { n.stopExpire <- true } @@ -172,7 +179,7 @@ func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time n.ExpireTime = expireTime n.Expire() e.Expiration = &n.ExpireTime - e.TTL = int64(expireTime.Sub(time.Now()) / time.Second) + e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 } fs.WatcherHub.notify(e) @@ -205,7 +212,7 @@ func (fs *FileSystem) TestAndSet(nodePath string, prevValue string, prevIndex ui return e, nil } - cause := fmt.Sprintf("[%v/%v] [%v/%v]", prevValue, f.Value, prevIndex, f.ModifiedIndex) + cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, f.Value, prevIndex, f.ModifiedIndex) return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause) } @@ -241,6 +248,16 @@ func (fs *FileSystem) Delete(nodePath string, recursive bool, index uint64, term return e, nil } +func (fs *FileSystem) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) { + fs.Index, fs.Term = index, term + + if sinceIndex == 0 { + return fs.WatcherHub.watch(prefix, recursive, index+1) + } + + return fs.WatcherHub.watch(prefix, recursive, sinceIndex) +} + // walk function walks all the nodePath and apply the walkFunc on each directory func (fs *FileSystem) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, error)) (*Node, error) { components := strings.Split(nodePath, "/") @@ -265,7 +282,7 @@ func (fs *FileSystem) walk(nodePath string, walkFunc func(prev *Node, component // InternalGet function get the node of the given nodePath. func (fs *FileSystem) InternalGet(nodePath string, index uint64, term uint64) (*Node, error) { - nodePath = path.Clean("/" + nodePath) + nodePath = path.Clean(path.Join("/", nodePath)) // update file system known index and term fs.Index, fs.Term = index, term diff --git a/file_system/file_system_test.go b/file_system/file_system_test.go index 4da413da9..d36c86e2a 100644 --- a/file_system/file_system_test.go +++ b/file_system/file_system_test.go @@ -10,10 +10,7 @@ import ( func TestCreateAndGet(t *testing.T) { fs := New() - // this should create successfully - createAndGet(fs, "/foobar", t) - createAndGet(fs, "/foo/bar", t) - createAndGet(fs, "/foo/foo/bar", t) + fs.Create("/foobar", "bar", Permanent, 1, 1) // already exist, create should fail _, err := fs.Create("/foobar", "bar", Permanent, 1, 1) @@ -22,6 +19,13 @@ func TestCreateAndGet(t *testing.T) { t.Fatal("Create should fail") } + fs.Delete("/foobar", true, 1, 1) + + // this should create successfully + createAndGet(fs, "/foobar", t) + createAndGet(fs, "/foo/bar", t) + createAndGet(fs, "/foo/foo/bar", t) + // meet file, create should fail _, err = fs.Create("/foo/bar/bar", "bar", Permanent, 2, 1) diff --git a/raft_handlers.go b/raft_handlers.go index 8ae9d2f87..1a92560ec 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -2,8 +2,9 @@ package main import ( "encoding/json" - "github.com/coreos/go-raft" "net/http" + + "github.com/coreos/go-raft" ) //------------------------------------------------------------- diff --git a/raft_server.go b/raft_server.go index c8b86021c..1175f4665 100644 --- a/raft_server.go +++ b/raft_server.go @@ -277,4 +277,7 @@ func registerCommands() { raft.RegisterCommand(&DeleteCommand{}) raft.RegisterCommand(&WatchCommand{}) raft.RegisterCommand(&TestAndSetCommand{}) + + raft.RegisterCommand(&CreateCommand{}) + raft.RegisterCommand(&UpdateCommand{}) } diff --git a/util.go b/util.go index 22cbed641..96b1cf394 100644 --- a/util.go +++ b/util.go @@ -3,7 +3,6 @@ package main import ( "encoding/json" "fmt" - "github.com/coreos/etcd/web" "io" "log" "net" @@ -14,6 +13,9 @@ import ( "runtime/pprof" "strconv" "time" + + "github.com/coreos/etcd/file_system" + "github.com/coreos/etcd/web" ) //-------------------------------------- @@ -26,12 +28,12 @@ func durationToExpireTime(strDuration string) (time.Time, error) { duration, err := strconv.Atoi(strDuration) if err != nil { - return time.Unix(0, 0), err + return fileSystem.Permanent, err } return time.Now().Add(time.Second * (time.Duration)(duration)), nil } else { - return time.Unix(0, 0), nil + return fileSystem.Permanent, nil } }