mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
feat(stream watchers) add stream watcher support
This commit is contained in:
@@ -25,7 +25,7 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
|
||||
}
|
||||
|
||||
// Start the watcher on the store.
|
||||
watcher, err := s.Store().Watch(key, false, sinceIndex)
|
||||
watcher, err := s.Store().Watch(key, false, false, sinceIndex)
|
||||
if err != nil {
|
||||
return etcdErr.NewError(500, key, s.Store().Index())
|
||||
}
|
||||
|
||||
@@ -4,20 +4,17 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
"github.com/coreos/etcd/log"
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/raft"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
|
||||
var err error
|
||||
var event *store.Event
|
||||
|
||||
vars := mux.Vars(req)
|
||||
key := "/" + vars["key"]
|
||||
|
||||
@@ -40,52 +37,86 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
|
||||
}
|
||||
|
||||
recursive := (req.FormValue("recursive") == "true")
|
||||
sorted := (req.FormValue("sorted") == "true")
|
||||
sort := (req.FormValue("sorted") == "true")
|
||||
waitIndex := req.FormValue("waitIndex")
|
||||
stream := (req.FormValue("stream") == "true")
|
||||
|
||||
if req.FormValue("wait") == "true" { // watch
|
||||
// Create a command to watch from a given index (default 0).
|
||||
var sinceIndex uint64 = 0
|
||||
if req.FormValue("wait") == "true" {
|
||||
return handleWatch(key, recursive, stream, waitIndex, w, s)
|
||||
}
|
||||
|
||||
waitIndex := req.FormValue("waitIndex")
|
||||
if waitIndex != "" {
|
||||
sinceIndex, err = strconv.ParseUint(string(req.FormValue("waitIndex")), 10, 64)
|
||||
if err != nil {
|
||||
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index())
|
||||
}
|
||||
}
|
||||
return handleGet(key, recursive, sort, w, s)
|
||||
}
|
||||
|
||||
// Start the watcher on the store.
|
||||
watcher, err := s.Store().Watch(key, recursive, sinceIndex)
|
||||
func handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, s Server) error {
|
||||
// Create a command to watch from a given index (default 0).
|
||||
var sinceIndex uint64 = 0
|
||||
var err error
|
||||
|
||||
if waitIndex != "" {
|
||||
sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cn, _ := w.(http.CloseNotifier)
|
||||
closeChan := cn.CloseNotify()
|
||||
|
||||
select {
|
||||
case <-closeChan:
|
||||
watcher.Remove()
|
||||
return nil
|
||||
case event = <-watcher.EventChan:
|
||||
}
|
||||
|
||||
} else { //get
|
||||
// Retrieve the key from the store.
|
||||
event, err = s.Store().Get(key, recursive, sorted)
|
||||
if err != nil {
|
||||
return err
|
||||
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index())
|
||||
}
|
||||
}
|
||||
|
||||
watcher, err := s.Store().Watch(key, recursive, stream, sinceIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cn, _ := w.(http.CloseNotifier)
|
||||
closeChan := cn.CloseNotify()
|
||||
|
||||
writeHeaders(w, s)
|
||||
|
||||
if stream {
|
||||
// watcher hub will not help to remove stream watcher
|
||||
// so we need to remove here
|
||||
defer watcher.Remove()
|
||||
chunkWriter := httputil.NewChunkedWriter(w)
|
||||
for {
|
||||
select {
|
||||
case <-closeChan:
|
||||
chunkWriter.Close()
|
||||
return nil
|
||||
case event := <-watcher.EventChan:
|
||||
b, _ := json.Marshal(event)
|
||||
_, err := chunkWriter.Write(b)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-closeChan:
|
||||
watcher.Remove()
|
||||
case event := <-watcher.EventChan:
|
||||
b, _ := json.Marshal(event)
|
||||
w.Write(b)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleGet(key string, recursive, sort bool, w http.ResponseWriter, s Server) error {
|
||||
event, err := s.Store().Get(key, recursive, sort)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
writeHeaders(w, s)
|
||||
b, _ := json.Marshal(event)
|
||||
w.Write(b)
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeHeaders(w http.ResponseWriter, s Server) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
|
||||
w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
|
||||
w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
b, _ := json.Marshal(event)
|
||||
|
||||
w.Write(b)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user