mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
commit
b43fba6a23
23
command.go
23
command.go
@ -36,9 +36,22 @@ func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) {
|
|||||||
return store.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex())
|
return store.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the path for http request
|
// TestAndSet command
|
||||||
func (c *SetCommand) GeneratePath() string {
|
type TestAndSetCommand struct {
|
||||||
return "set/" + c.Key
|
Key string `json:"key"`
|
||||||
|
Value string `json:"value"`
|
||||||
|
PrevValue string `json: prevValue`
|
||||||
|
ExpireTime time.Time `json:"expireTime"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// The name of the command in the log
|
||||||
|
func (c *TestAndSetCommand) CommandName() string {
|
||||||
|
return "testAndSet"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the value of key to value
|
||||||
|
func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
|
||||||
|
return store.TestAndSet(c.Key, c.PrevValue, c.Value, c.ExpireTime, server.CommitIndex())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get command
|
// Get command
|
||||||
@ -57,10 +70,6 @@ func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
|
|||||||
return json.Marshal(res)
|
return json.Marshal(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *GetCommand) GeneratePath() string {
|
|
||||||
return "get/" + c.Key
|
|
||||||
}
|
|
||||||
|
|
||||||
// List command
|
// List command
|
||||||
type ListCommand struct {
|
type ListCommand struct {
|
||||||
Prefix string `json:"prefix"`
|
Prefix string `json:"prefix"`
|
||||||
|
4
etcd.go
4
etcd.go
@ -137,6 +137,9 @@ func main() {
|
|||||||
raft.RegisterCommand(&SetCommand{})
|
raft.RegisterCommand(&SetCommand{})
|
||||||
raft.RegisterCommand(&GetCommand{})
|
raft.RegisterCommand(&GetCommand{})
|
||||||
raft.RegisterCommand(&DeleteCommand{})
|
raft.RegisterCommand(&DeleteCommand{})
|
||||||
|
raft.RegisterCommand(&WatchCommand{})
|
||||||
|
raft.RegisterCommand(&ListCommand{})
|
||||||
|
raft.RegisterCommand(&TestAndSetCommand{})
|
||||||
|
|
||||||
if err := os.MkdirAll(dirPath, 0744); err != nil {
|
if err := os.MkdirAll(dirPath, 0744); err != nil {
|
||||||
fatal("Unable to create path: %v", err)
|
fatal("Unable to create path: %v", err)
|
||||||
@ -326,6 +329,7 @@ func startClientTransport(port int, st int) {
|
|||||||
http.HandleFunc("/v1/keys/", Multiplexer)
|
http.HandleFunc("/v1/keys/", Multiplexer)
|
||||||
http.HandleFunc("/v1/watch/", WatchHttpHandler)
|
http.HandleFunc("/v1/watch/", WatchHttpHandler)
|
||||||
http.HandleFunc("/v1/list/", ListHttpHandler)
|
http.HandleFunc("/v1/list/", ListHttpHandler)
|
||||||
|
http.HandleFunc("/v1/testAndSet/", TestAndSetHttpHandler)
|
||||||
http.HandleFunc("/master", MasterHttpHandler)
|
http.HandleFunc("/master", MasterHttpHandler)
|
||||||
|
|
||||||
switch st {
|
switch st {
|
||||||
|
29
handlers.go
29
handlers.go
@ -134,6 +134,35 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||||
|
key := req.URL.Path[len("/v1/testAndSet/"):]
|
||||||
|
|
||||||
|
debug("[recv] POST http://%v/v1/testAndSet/%s", server.Name(), key)
|
||||||
|
|
||||||
|
command := &TestAndSetCommand{}
|
||||||
|
command.Key = key
|
||||||
|
|
||||||
|
command.PrevValue = req.FormValue("prevValue")
|
||||||
|
command.Value = req.FormValue("value")
|
||||||
|
strDuration := req.FormValue("ttl")
|
||||||
|
|
||||||
|
if strDuration != "" {
|
||||||
|
duration, err := strconv.Atoi(strDuration)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
warn("raftd: Bad duration: %v", err)
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
command.ExpireTime = time.Now().Add(time.Second * (time.Duration)(duration))
|
||||||
|
} else {
|
||||||
|
command.ExpireTime = time.Unix(0, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
excute(command, &w, req)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
|
func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
|
||||||
key := req.URL.Path[len("/v1/keys/"):]
|
key := req.URL.Path[len("/v1/keys/"):]
|
||||||
|
|
||||||
|
149
store/store.go
149
store/store.go
@ -212,75 +212,6 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// should be used as a go routine to delete the key when it expires
|
|
||||||
func expire(key string, update chan time.Time, expireTime time.Time) {
|
|
||||||
duration := expireTime.Sub(time.Now())
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
// timeout delete the node
|
|
||||||
case <-time.After(duration):
|
|
||||||
node, ok := s.Tree.get(key)
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
} else {
|
|
||||||
|
|
||||||
s.Tree.delete(key)
|
|
||||||
|
|
||||||
resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index}
|
|
||||||
|
|
||||||
msg, err := json.Marshal(resp)
|
|
||||||
|
|
||||||
notify(resp)
|
|
||||||
|
|
||||||
// notify the messager
|
|
||||||
if s.messager != nil && err == nil {
|
|
||||||
|
|
||||||
*s.messager <- string(msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
case updateTime := <-update:
|
|
||||||
//update duration
|
|
||||||
// if the node become a permanent one, the go routine is
|
|
||||||
// not needed
|
|
||||||
if updateTime.Equal(PERMANENT) {
|
|
||||||
fmt.Println("permanent")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// update duration
|
|
||||||
duration = updateTime.Sub(time.Now())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func updateMap(index uint64, resp *Response) {
|
|
||||||
|
|
||||||
if s.ResponseMaxSize == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
strIndex := strconv.FormatUint(index, 10)
|
|
||||||
s.ResponseMap[strIndex] = *resp
|
|
||||||
|
|
||||||
// unlimited
|
|
||||||
if s.ResponseMaxSize < 0{
|
|
||||||
s.ResponseCurrSize++
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
|
|
||||||
s.ResponseStartIndex++
|
|
||||||
delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
|
|
||||||
} else {
|
|
||||||
s.ResponseCurrSize++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// get the value of the key
|
// get the value of the key
|
||||||
func Get(key string) Response {
|
func Get(key string) Response {
|
||||||
key = "/" + key
|
key = "/" + key
|
||||||
@ -375,6 +306,86 @@ func Delete(key string, index uint64) ([]byte, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set the value of the key to the value if the given prevValue is equal to the value of the key
|
||||||
|
func TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
|
||||||
|
resp := Get(key)
|
||||||
|
|
||||||
|
if resp.PrevValue == prevValue {
|
||||||
|
return Set(key, value, expireTime, index)
|
||||||
|
} else {
|
||||||
|
return json.Marshal(resp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// should be used as a go routine to delete the key when it expires
|
||||||
|
func expire(key string, update chan time.Time, expireTime time.Time) {
|
||||||
|
duration := expireTime.Sub(time.Now())
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
// timeout delete the node
|
||||||
|
case <-time.After(duration):
|
||||||
|
node, ok := s.Tree.get(key)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
|
||||||
|
s.Tree.delete(key)
|
||||||
|
|
||||||
|
resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index}
|
||||||
|
|
||||||
|
msg, err := json.Marshal(resp)
|
||||||
|
|
||||||
|
notify(resp)
|
||||||
|
|
||||||
|
// notify the messager
|
||||||
|
if s.messager != nil && err == nil {
|
||||||
|
|
||||||
|
*s.messager <- string(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
case updateTime := <-update:
|
||||||
|
//update duration
|
||||||
|
// if the node become a permanent one, the go routine is
|
||||||
|
// not needed
|
||||||
|
if updateTime.Equal(PERMANENT) {
|
||||||
|
fmt.Println("permanent")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// update duration
|
||||||
|
duration = updateTime.Sub(time.Now())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateMap(index uint64, resp *Response) {
|
||||||
|
|
||||||
|
if s.ResponseMaxSize == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
strIndex := strconv.FormatUint(index, 10)
|
||||||
|
s.ResponseMap[strIndex] = *resp
|
||||||
|
|
||||||
|
// unlimited
|
||||||
|
if s.ResponseMaxSize < 0{
|
||||||
|
s.ResponseCurrSize++
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
|
||||||
|
s.ResponseStartIndex++
|
||||||
|
delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
|
||||||
|
} else {
|
||||||
|
s.ResponseCurrSize++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// save the current state of the storage system
|
// save the current state of the storage system
|
||||||
func (s *Store) Save() ([]byte, error) {
|
func (s *Store) Save() ([]byte, error) {
|
||||||
b, err := json.Marshal(s)
|
b, err := json.Marshal(s)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user