From b2e0836bb3b63fc114e2347d158923bae4d7c091 Mon Sep 17 00:00:00 2001 From: Blake Mizerany Date: Wed, 27 Aug 2014 13:37:22 -0700 Subject: [PATCH] etcdserver/etcdhttp: wip --- etcdserver2/etcdhttp/http.go | 68 +++++++++++++++++++++++++++++++ etcdserver2/etcdhttp/http_test.go | 5 +++ etcdserver2/server.go | 41 +++++++++++-------- 3 files changed, 97 insertions(+), 17 deletions(-) create mode 100644 etcdserver2/etcdhttp/http.go create mode 100644 etcdserver2/etcdhttp/http_test.go diff --git a/etcdserver2/etcdhttp/http.go b/etcdserver2/etcdhttp/http.go new file mode 100644 index 000000000..b03af321d --- /dev/null +++ b/etcdserver2/etcdhttp/http.go @@ -0,0 +1,68 @@ +package etcdhttp + +import ( + "io" + "net/http" + "time" + + "code.google.com/p/go.net/context" + etcdserver "github.com/coreos/etcd/etcdserver2" + "github.com/coreos/etcd/raft" +) + +func SendWithPrefix(prefix string, send etcdserver.SendFunc) etcdserver.SendFunc { + return etcdserver.SendFunc(func(m []raft.Message) { + /* + url = parseurl + u.Path = prefix + u.Path + for maxTrys { + resp, err := http.Post(u.String(), ...) + if err... + backoff? + } + */ + }) +} + +const DefaultTimeout = 500 * time.Millisecond + +type Handler struct { + Timeout time.Duration + Server etcdserver.Server +} + +func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // TODO: set read/write timeout? + + timeout := h.Timeout + if timeout == 0 { + timeout = DefaultTimeout + } + + ctx, _ := context.WithTimeout(context.Background(), timeout) + // TODO(bmizerany): watch the closenotify chan in another goroutine can + // call cancel when it closes. be sure to watch ctx.Done() too so we + // don't leak a ton of these goroutines. + + rr, err := parseRequest(r) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + + resp, err := h.Server.Do(ctx, rr) + if err != nil { + // TODO(bmizerany): switch on store errors and etcdserver.ErrUnknownMethod + panic("TODO") + } + + encodeResponse(w, resp) +} + +func parseRequest(r *http.Request) (etcdserver.Request, error) { + return etcdserver.Request{}, nil +} + +func encodeResponse(w io.Writer, resp etcdserver.Response) { + +} diff --git a/etcdserver2/etcdhttp/http_test.go b/etcdserver2/etcdhttp/http_test.go new file mode 100644 index 000000000..7038d8fab --- /dev/null +++ b/etcdserver2/etcdhttp/http_test.go @@ -0,0 +1,5 @@ +package etcdhttp + +import "testing" + +func TestHandler(t *testing.T) {} diff --git a/etcdserver2/server.go b/etcdserver2/server.go index fa1df4bee..1cb3908e1 100644 --- a/etcdserver2/server.go +++ b/etcdserver2/server.go @@ -2,6 +2,7 @@ package etcdserver import ( "errors" + "sync" "time" "code.google.com/p/go.net/context" @@ -12,6 +13,8 @@ import ( var ErrUnknownMethod = errors.New("etcdserver: unknown method") +type SendFunc func(m []raft.Message) + type Response struct { // The last seen term raft was at when this request was built. Term int64 @@ -26,18 +29,17 @@ type Response struct { } type Server struct { - n raft.Node - w wait.List + once sync.Once + w wait.List - msgsc chan raft.Message - - st store.Store + Node raft.Node + Store store.Store // Send specifies the send function for sending msgs to peers. Send // MUST NOT block. It is okay to drop messages, since clients should // timeout and reissue their messages. If Send is nil, Server will // panic. - Send func(msgs []raft.Message) + Send SendFunc // Save specifies the save function for saving ents to stable storage. // Save MUST block until st and ents are on stable storage. If Send is @@ -45,10 +47,13 @@ type Server struct { Save func(st raft.State, ents []raft.Entry) } +func (s *Server) init() { s.w = wait.New() } + func (s *Server) Run(ctx context.Context) { + s.once.Do(s.init) for { select { - case rd := <-s.n.Ready(): + case rd := <-s.Node.Ready(): s.Save(rd.State, rd.Entries) s.Send(rd.Messages) go func() { @@ -68,6 +73,7 @@ func (s *Server) Run(ctx context.Context) { } func (s *Server) Do(ctx context.Context, r Request) (Response, error) { + s.once.Do(s.init) if r.Id == 0 { panic("r.Id cannot be 0") } @@ -78,7 +84,7 @@ func (s *Server) Do(ctx context.Context, r Request) (Response, error) { return Response{}, err } ch := s.w.Register(r.Id) - s.n.Propose(ctx, r.Id, data) + s.Node.Propose(ctx, r.Id, data) select { case x := <-ch: resp := x.(Response) @@ -90,13 +96,13 @@ func (s *Server) Do(ctx context.Context, r Request) (Response, error) { case "GET": switch { case r.Wait: - wc, err := s.st.Watch(r.Path, r.Recursive, false, r.Since) + wc, err := s.Store.Watch(r.Path, r.Recursive, false, r.Since) if err != nil { return Response{}, err } return Response{Watcher: wc}, nil default: - ev, err := s.st.Get(r.Path, r.Recursive, r.Sorted) + ev, err := s.Store.Get(r.Path, r.Recursive, r.Sorted) if err != nil { return Response{}, err } @@ -117,29 +123,30 @@ func (s *Server) apply(ctx context.Context, e raft.Entry) (*store.Event, error) expr := time.Unix(0, r.Expiration) switch r.Method { case "POST": - return s.st.Create(r.Path, r.Dir, r.Val, true, expr) + return s.Store.Create(r.Path, r.Dir, r.Val, true, expr) case "PUT": exists, existsSet := getBool(r.PrevExists) switch { case existsSet: if exists { - return s.st.Update(r.Path, r.Val, expr) + return s.Store.Update(r.Path, r.Val, expr) } else { - return s.st.Create(r.Path, r.Dir, r.Val, false, expr) + return s.Store.Create(r.Path, r.Dir, r.Val, false, expr) } case r.PrevIndex > 0 || r.PrevValue != "": - return s.st.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr) + return s.Store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr) default: - return s.st.Set(r.Path, r.Dir, r.Val, expr) + return s.Store.Set(r.Path, r.Dir, r.Val, expr) } case "DELETE": switch { case r.PrevIndex > 0 || r.PrevValue != "": - return s.st.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex) + return s.Store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex) default: - return s.st.Delete(r.Path, r.Recursive, r.Dir) + return s.Store.Delete(r.Path, r.Recursive, r.Dir) } default: + // This should never be reached, but just in case: return nil, ErrUnknownMethod } }