feat add dir_flag

This commit is contained in:
Xiang Li 2013-12-05 17:10:37 -05:00
parent af20be8123
commit 40d297be66
15 changed files with 88 additions and 58 deletions

View File

@ -33,6 +33,7 @@ const (
EcodeNodeExist = 105
EcodeKeyIsPreserved = 106
EcodeRootROnly = 107
EcodeDirNotEmpty = 108
EcodeValueRequired = 200
EcodePrevValueRequired = 201
@ -59,6 +60,7 @@ func init() {
errors[EcodeNodeExist] = "Already exists" // create
errors[EcodeRootROnly] = "Root is read only"
errors[EcodeKeyIsPreserved] = "The prefix of given key is a keyword in etcd"
errors[EcodeDirNotEmpty] = "The directory is not empty"
// Post form related errors
errors[EcodeValueRequired] = "Value is Required in POST form"

View File

@ -45,7 +45,7 @@ func (r *Registry) Register(name string, peerURL string, url string) error {
// Write data to store.
key := path.Join(RegistryKey, name)
value := fmt.Sprintf("raft=%s&etcd=%s", peerURL, url)
_, err := r.store.Create(key, value, false, store.Permanent)
_, err := r.store.Create(key, false, value, false, store.Permanent)
log.Debugf("Register: %s", name)
return err
}
@ -59,7 +59,7 @@ func (r *Registry) Unregister(name string) error {
// delete(r.nodes, name)
// Remove the key from the store.
_, err := r.store.Delete(path.Join(RegistryKey, name), false)
_, err := r.store.Delete(path.Join(RegistryKey, name), false, false)
log.Debugf("Unregister: %s", name)
return err
}

View File

@ -46,14 +46,14 @@ func New(name string, urlStr string, bindAddr string, tlsConf *TLSConfig, tlsInf
TLSConfig: &tlsConf.Server,
Addr: bindAddr,
},
name: name,
store: store,
registry: registry,
url: urlStr,
tlsConf: tlsConf,
tlsInfo: tlsInfo,
peerServer: peerServer,
router: r,
name: name,
store: store,
registry: registry,
url: urlStr,
tlsConf: tlsConf,
tlsInfo: tlsInfo,
peerServer: peerServer,
router: r,
corsHandler: cors,
}
@ -377,7 +377,7 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro
for i := 0; i < count; i++ {
go func() {
for j := 0; j < 10; j++ {
c := s.Store().CommandFactory().CreateSetCommand("foo", "bar", time.Unix(0, 0))
c := s.Store().CommandFactory().CreateSetCommand("foo", false, "bar", time.Unix(0, 0))
s.peerServer.RaftServer().Do(c)
}
c <- true

View File

@ -9,6 +9,6 @@ import (
func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
c := s.Store().CommandFactory().CreateDeleteCommand(key, false)
c := s.Store().CommandFactory().CreateDeleteCommand(key, false, false)
return s.Dispatch(c, w, req)
}

View File

@ -36,11 +36,11 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
c = s.Store().CommandFactory().CreateCompareAndSwapCommand(key, value, prevValueArr[0], 0, expireTime)
} else {
// test against existence
c = s.Store().CommandFactory().CreateCreateCommand(key, value, expireTime, false)
c = s.Store().CommandFactory().CreateCreateCommand(key, false, value, expireTime, false)
}
} else {
c = s.Store().CommandFactory().CreateSetCommand(key, value, expireTime)
c = s.Store().CommandFactory().CreateSetCommand(key, false, value, expireTime)
}
return s.Dispatch(c, w, req)

View File

@ -9,8 +9,10 @@ import (
func DeleteHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
recursive := (req.FormValue("recursive") == "true")
c := s.Store().CommandFactory().CreateDeleteCommand(key, recursive)
recursive := (req.FormValue("recursive") == "true")
dir := (req.FormValue("dir") == "true")
c := s.Store().CommandFactory().CreateDeleteCommand(key, dir, recursive)
return s.Dispatch(c, w, req)
}

View File

@ -13,11 +13,12 @@ func PostHandler(w http.ResponseWriter, req *http.Request, s Server) error {
key := "/" + vars["key"]
value := req.FormValue("value")
dir := (req.FormValue("dir") == "true")
expireTime, err := store.TTL(req.FormValue("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", s.Store().Index())
}
c := s.Store().CommandFactory().CreateCreateCommand(key, value, expireTime, true)
c := s.Store().CommandFactory().CreateCreateCommand(key, dir, value, expireTime, true)
return s.Dispatch(c, w, req)
}

View File

@ -20,23 +20,25 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
req.ParseForm()
value := req.Form.Get("value")
dir := (req.FormValue("dir") == "true")
expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", s.Store().Index())
}
_, valueOk := req.Form["prevValue"]
prevValue := req.Form.Get("prevValue")
prevValue := req.FormValue("prevValue")
_, indexOk := req.Form["prevIndex"]
prevIndexStr := req.Form.Get("prevIndex")
prevIndexStr := req.FormValue("prevIndex")
_, existOk := req.Form["prevExist"]
prevExist := req.Form.Get("prevExist")
prevExist := req.FormValue("prevExist")
// Set handler: create a new node or replace the old one.
if !valueOk && !indexOk && !existOk {
return SetHandler(w, req, s, key, value, expireTime)
return SetHandler(w, req, s, key, dir, value, expireTime)
}
// update with test
@ -44,7 +46,7 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
if prevExist == "false" {
// Create command: create a new node. Fail, if a node already exists
// Ignore prevIndex and prevValue
return CreateHandler(w, req, s, key, value, expireTime)
return CreateHandler(w, req, s, key, dir, value, expireTime)
}
if prevExist == "true" && !indexOk && !valueOk {
@ -75,13 +77,13 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
return s.Dispatch(c, w, req)
}
func SetHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
c := s.Store().CommandFactory().CreateSetCommand(key, value, expireTime)
func SetHandler(w http.ResponseWriter, req *http.Request, s Server, key string, dir bool, value string, expireTime time.Time) error {
c := s.Store().CommandFactory().CreateSetCommand(key, dir, value, expireTime)
return s.Dispatch(c, w, req)
}
func CreateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
c := s.Store().CommandFactory().CreateCreateCommand(key, value, expireTime, false)
func CreateHandler(w http.ResponseWriter, req *http.Request, s Server, key string, dir bool, value string, expireTime time.Time) error {
c := s.Store().CommandFactory().CreateCreateCommand(key, dir, value, expireTime, false)
return s.Dispatch(c, w, req)
}

View File

@ -16,11 +16,12 @@ var minVersion, maxVersion int
type CommandFactory interface {
Version() int
CreateUpgradeCommand() raft.Command
CreateSetCommand(key string, value string, expireTime time.Time) raft.Command
CreateCreateCommand(key string, value string, expireTime time.Time, unique bool) raft.Command
CreateSetCommand(key string, dir bool, value string, expireTime time.Time) raft.Command
CreateCreateCommand(key string, dir bool, value string, expireTime time.Time, unique bool) raft.Command
CreateUpdateCommand(key string, value string, expireTime time.Time) raft.Command
CreateDeleteCommand(key string, recursive bool) raft.Command
CreateCompareAndSwapCommand(key string, value string, prevValue string, prevIndex uint64, expireTime time.Time) raft.Command
CreateDeleteCommand(key string, dir, recursive bool) raft.Command
CreateCompareAndSwapCommand(key string, value string, prevValue string,
prevIndex uint64, expireTime time.Time) raft.Command
CreateSyncCommand(now time.Time) raft.Command
}

View File

@ -175,11 +175,19 @@ func (n *node) Add(child *node) *etcdErr.Error {
}
// Remove function remove the node.
func (n *node) Remove(recursive bool, callback func(path string)) *etcdErr.Error {
func (n *node) Remove(dir, recursive bool, callback func(path string)) *etcdErr.Error {
if n.IsDir() && !recursive {
// cannot delete a directory without set recursive to true
return etcdErr.NewError(etcdErr.EcodeNotFile, "", n.store.Index())
if n.IsDir() {
if !dir {
// cannot delete a directory without set recursive to true
return etcdErr.NewError(etcdErr.EcodeNotFile, n.Path, n.store.Index())
}
if len(n.Children) != 0 && !recursive {
// cannot delete a directory if it is not empty and the operation
// is not recursive
return etcdErr.NewError(etcdErr.EcodeDirNotEmpty, n.Path, n.store.Index())
}
}
if !n.IsDir() { // key-value pair
@ -202,7 +210,7 @@ func (n *node) Remove(recursive bool, callback func(path string)) *etcdErr.Error
}
for _, child := range n.Children { // delete all children
child.Remove(true, callback)
child.Remove(true, true, callback)
}
// delete self

View File

@ -42,17 +42,20 @@ type Store interface {
Version() int
CommandFactory() CommandFactory
Index() uint64
Get(nodePath string, recursive, sorted bool) (*Event, error)
Set(nodePath string, value string, expireTime time.Time) (*Event, error)
Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error)
Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
Create(nodePath string, value string, incrementalSuffix bool,
Create(nodePath string, dir bool, value string, unique bool,
expireTime time.Time) (*Event, error)
CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time) (*Event, error)
Delete(nodePath string, recursive bool) (*Event, error)
Delete(nodePath string, recursive, dir bool) (*Event, error)
Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error)
Save() ([]byte, error)
Recovery(state []byte) error
TotalTransactions() uint64
JsonStats() []byte
DeleteExpiredKeys(cutoff time.Time)
@ -156,10 +159,10 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
// Create function creates the node at nodePath. Create will help to create intermediate directories with no ttl.
// If the node has already existed, create will fail.
// If any node on the path is a file, create will fail.
func (s *store) Create(nodePath string, value string, unique bool, expireTime time.Time) (*Event, error) {
func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) {
s.worldLock.Lock()
defer s.worldLock.Unlock()
e, err := s.internalCreate(nodePath, value, unique, false, expireTime, Create)
e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
if err == nil {
s.Stats.Inc(CreateSuccess)
@ -171,10 +174,10 @@ func (s *store) Create(nodePath string, value string, unique bool, expireTime ti
}
// Set function creates or replace the node at nodePath.
func (s *store) Set(nodePath string, value string, expireTime time.Time) (*Event, error) {
func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
s.worldLock.Lock()
defer s.worldLock.Unlock()
e, err := s.internalCreate(nodePath, value, false, true, expireTime, Set)
e, err := s.internalCreate(nodePath, dir, value, false, true, expireTime, Set)
if err == nil {
s.Stats.Inc(SetSuccess)
@ -239,7 +242,7 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
// Delete function deletes the node at the given path.
// If the node is a directory, recursive must be true to delete it.
func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath))
// we do not allow the user to change "/"
if nodePath == "/" {
@ -272,7 +275,7 @@ func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
s.WatcherHub.notifyWatchers(e, path, true)
}
err = n.Remove(recursive, callback)
err = n.Remove(dir, recursive, callback)
if err != nil {
s.Stats.Inc(DeleteFail)
@ -375,8 +378,9 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
eNode.Value = newValue
} else {
// do not update value
// update value to empty
eNode.Value = n.Value
n.Value = ""
}
// update ttl
@ -393,7 +397,7 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
return e, nil
}
func (s *store) internalCreate(nodePath string, value string, unique bool, replace bool,
func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
expireTime time.Time, action string) (*Event, error) {
currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
@ -415,10 +419,10 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
expireTime = Permanent
}
dir, newNodeName := path.Split(nodePath)
dirName, nodeName := path.Split(nodePath)
// walk through the nodePath, create dirs and get the last directory node
d, err := s.walk(dir, s.checkDir)
d, err := s.walk(dirName, s.checkDir)
if err != nil {
s.Stats.Inc(SetFail)
@ -429,7 +433,7 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
e := newEvent(action, nodePath, nextIndex, nextIndex)
eNode := e.Node
n, _ := d.GetChild(newNodeName)
n, _ := d.GetChild(nodeName)
// force will try to replace a existing file
if n != nil {
@ -439,13 +443,13 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
}
eNode.PrevValue, _ = n.Read()
n.Remove(false, nil)
n.Remove(false, false, nil)
} else {
return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
}
}
if len(value) != 0 { // create file
if !dir { // create file
eNode.Value = value
n = newKV(s, nodePath, value, nextIndex, d, "", expireTime)
@ -512,7 +516,7 @@ func (s *store) DeleteExpiredKeys(cutoff time.Time) {
}
s.ttlKeyHeap.pop()
node.Remove(true, nil)
node.Remove(true, true, nil)
s.CurrentIndex++

View File

@ -26,21 +26,23 @@ func (f *CommandFactory) CreateUpgradeCommand() raft.Command {
}
// CreateSetCommand creates a version 2 command to set a key to a given value in the store.
func (f *CommandFactory) CreateSetCommand(key string, value string, expireTime time.Time) raft.Command {
func (f *CommandFactory) CreateSetCommand(key string, dir bool, value string, expireTime time.Time) raft.Command {
return &SetCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
Dir: dir,
}
}
// CreateCreateCommand creates a version 2 command to create a new key in the store.
func (f *CommandFactory) CreateCreateCommand(key string, value string, expireTime time.Time, unique bool) raft.Command {
func (f *CommandFactory) CreateCreateCommand(key string, dir bool, value string, expireTime time.Time, unique bool) raft.Command {
return &CreateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
Unique: unique,
Dir: dir,
}
}
@ -54,10 +56,11 @@ func (f *CommandFactory) CreateUpdateCommand(key string, value string, expireTim
}
// CreateDeleteCommand creates a version 2 command to delete a key from the store.
func (f *CommandFactory) CreateDeleteCommand(key string, recursive bool) raft.Command {
func (f *CommandFactory) CreateDeleteCommand(key string, dir, recursive bool) raft.Command {
return &DeleteCommand{
Key: key,
Recursive: recursive,
Dir: dir,
}
}

View File

@ -18,6 +18,7 @@ type CreateCommand struct {
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
Unique bool `json:"unique"`
Dir bool `json:"dir"`
}
// The name of the create command in the log
@ -29,7 +30,7 @@ func (c *CreateCommand) CommandName() string {
func (c *CreateCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(store.Store)
e, err := s.Create(c.Key, c.Value, c.Unique, c.ExpireTime)
e, err := s.Create(c.Key, c.Dir, c.Value, c.Unique, c.ExpireTime)
if err != nil {
log.Debug(err)

View File

@ -14,6 +14,7 @@ func init() {
type DeleteCommand struct {
Key string `json:"key"`
Recursive bool `json:"recursive"`
Dir bool `json:"dir"`
}
// The name of the delete command in the log
@ -25,7 +26,11 @@ func (c *DeleteCommand) CommandName() string {
func (c *DeleteCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(store.Store)
e, err := s.Delete(c.Key, c.Recursive)
if c.Recursive {
c.Dir = true
}
e, err := s.Delete(c.Key, c.Dir, c.Recursive)
if err != nil {
log.Debug(err)

View File

@ -17,6 +17,7 @@ type SetCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
Dir bool `json:"dir"`
}
// The name of the create command in the log
@ -29,7 +30,7 @@ func (c *SetCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(store.Store)
// create a new node or replace the old node.
e, err := s.Set(c.Key, c.Value, c.ExpireTime)
e, err := s.Set(c.Key, c.Dir, c.Value, c.ExpireTime)
if err != nil {
log.Debug(err)