etcdserver/etcdhttp: wip

This commit is contained in:
Blake Mizerany 2014-08-27 13:37:22 -07:00 committed by Yicheng Qin
parent 2d3cef2496
commit b2e0836bb3
3 changed files with 97 additions and 17 deletions

View File

@ -0,0 +1,68 @@
package etcdhttp
import (
"io"
"net/http"
"time"
"code.google.com/p/go.net/context"
etcdserver "github.com/coreos/etcd/etcdserver2"
"github.com/coreos/etcd/raft"
)
func SendWithPrefix(prefix string, send etcdserver.SendFunc) etcdserver.SendFunc {
return etcdserver.SendFunc(func(m []raft.Message) {
/*
url = parseurl
u.Path = prefix + u.Path
for maxTrys {
resp, err := http.Post(u.String(), ...)
if err...
backoff?
}
*/
})
}
const DefaultTimeout = 500 * time.Millisecond
type Handler struct {
Timeout time.Duration
Server etcdserver.Server
}
func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// TODO: set read/write timeout?
timeout := h.Timeout
if timeout == 0 {
timeout = DefaultTimeout
}
ctx, _ := context.WithTimeout(context.Background(), timeout)
// TODO(bmizerany): watch the closenotify chan in another goroutine can
// call cancel when it closes. be sure to watch ctx.Done() too so we
// don't leak a ton of these goroutines.
rr, err := parseRequest(r)
if err != nil {
http.Error(w, err.Error(), 400)
return
}
resp, err := h.Server.Do(ctx, rr)
if err != nil {
// TODO(bmizerany): switch on store errors and etcdserver.ErrUnknownMethod
panic("TODO")
}
encodeResponse(w, resp)
}
func parseRequest(r *http.Request) (etcdserver.Request, error) {
return etcdserver.Request{}, nil
}
func encodeResponse(w io.Writer, resp etcdserver.Response) {
}

View File

@ -0,0 +1,5 @@
package etcdhttp
import "testing"
func TestHandler(t *testing.T) {}

View File

@ -2,6 +2,7 @@ package etcdserver
import (
"errors"
"sync"
"time"
"code.google.com/p/go.net/context"
@ -12,6 +13,8 @@ import (
var ErrUnknownMethod = errors.New("etcdserver: unknown method")
type SendFunc func(m []raft.Message)
type Response struct {
// The last seen term raft was at when this request was built.
Term int64
@ -26,18 +29,17 @@ type Response struct {
}
type Server struct {
n raft.Node
w wait.List
once sync.Once
w wait.List
msgsc chan raft.Message
st store.Store
Node raft.Node
Store store.Store
// Send specifies the send function for sending msgs to peers. Send
// MUST NOT block. It is okay to drop messages, since clients should
// timeout and reissue their messages. If Send is nil, Server will
// panic.
Send func(msgs []raft.Message)
Send SendFunc
// Save specifies the save function for saving ents to stable storage.
// Save MUST block until st and ents are on stable storage. If Send is
@ -45,10 +47,13 @@ type Server struct {
Save func(st raft.State, ents []raft.Entry)
}
func (s *Server) init() { s.w = wait.New() }
func (s *Server) Run(ctx context.Context) {
s.once.Do(s.init)
for {
select {
case rd := <-s.n.Ready():
case rd := <-s.Node.Ready():
s.Save(rd.State, rd.Entries)
s.Send(rd.Messages)
go func() {
@ -68,6 +73,7 @@ func (s *Server) Run(ctx context.Context) {
}
func (s *Server) Do(ctx context.Context, r Request) (Response, error) {
s.once.Do(s.init)
if r.Id == 0 {
panic("r.Id cannot be 0")
}
@ -78,7 +84,7 @@ func (s *Server) Do(ctx context.Context, r Request) (Response, error) {
return Response{}, err
}
ch := s.w.Register(r.Id)
s.n.Propose(ctx, r.Id, data)
s.Node.Propose(ctx, r.Id, data)
select {
case x := <-ch:
resp := x.(Response)
@ -90,13 +96,13 @@ func (s *Server) Do(ctx context.Context, r Request) (Response, error) {
case "GET":
switch {
case r.Wait:
wc, err := s.st.Watch(r.Path, r.Recursive, false, r.Since)
wc, err := s.Store.Watch(r.Path, r.Recursive, false, r.Since)
if err != nil {
return Response{}, err
}
return Response{Watcher: wc}, nil
default:
ev, err := s.st.Get(r.Path, r.Recursive, r.Sorted)
ev, err := s.Store.Get(r.Path, r.Recursive, r.Sorted)
if err != nil {
return Response{}, err
}
@ -117,29 +123,30 @@ func (s *Server) apply(ctx context.Context, e raft.Entry) (*store.Event, error)
expr := time.Unix(0, r.Expiration)
switch r.Method {
case "POST":
return s.st.Create(r.Path, r.Dir, r.Val, true, expr)
return s.Store.Create(r.Path, r.Dir, r.Val, true, expr)
case "PUT":
exists, existsSet := getBool(r.PrevExists)
switch {
case existsSet:
if exists {
return s.st.Update(r.Path, r.Val, expr)
return s.Store.Update(r.Path, r.Val, expr)
} else {
return s.st.Create(r.Path, r.Dir, r.Val, false, expr)
return s.Store.Create(r.Path, r.Dir, r.Val, false, expr)
}
case r.PrevIndex > 0 || r.PrevValue != "":
return s.st.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr)
return s.Store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr)
default:
return s.st.Set(r.Path, r.Dir, r.Val, expr)
return s.Store.Set(r.Path, r.Dir, r.Val, expr)
}
case "DELETE":
switch {
case r.PrevIndex > 0 || r.PrevValue != "":
return s.st.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex)
return s.Store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex)
default:
return s.st.Delete(r.Path, r.Recursive, r.Dir)
return s.Store.Delete(r.Path, r.Recursive, r.Dir)
}
default:
// This should never be reached, but just in case:
return nil, ErrUnknownMethod
}
}