mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver/etcdhttp: switch to using http.ServeMux
This commit is contained in:
parent
763c276d27
commit
e04c028d64
@ -28,57 +28,56 @@ import (
|
||||
const (
|
||||
keysPrefix = "/v2/keys"
|
||||
machinesPrefix = "/v2/machines"
|
||||
raftPrefix = "/raft"
|
||||
|
||||
DefaultTimeout = 500 * time.Millisecond
|
||||
)
|
||||
|
||||
var errClosed = errors.New("etcdhttp: client closed connection")
|
||||
|
||||
// Handler implements the http.Handler interface and serves etcd client and
|
||||
// raft communication.
|
||||
type Handler struct {
|
||||
Timeout time.Duration
|
||||
Server etcdserver.Server
|
||||
// NewHandler generates a muxed http.Handler with the given parameters.
|
||||
func NewHandler(server etcdserver.Server, peers Peers, timeout time.Duration) http.Handler {
|
||||
sh := &serverHandler{
|
||||
timeout: timeout,
|
||||
server: server,
|
||||
peers: peers,
|
||||
}
|
||||
if sh.timeout == 0 {
|
||||
sh.timeout = DefaultTimeout
|
||||
}
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc(raftPrefix, sh.serveRaft)
|
||||
mux.HandleFunc(keysPrefix, sh.serveKeys)
|
||||
mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
|
||||
// TODO: dynamic configuration may make this outdated. take care of it.
|
||||
// TODO: dynamic configuration may introduce race also.
|
||||
Peers Peers
|
||||
mux.HandleFunc(machinesPrefix, sh.serveMachines)
|
||||
mux.HandleFunc("/", http.NotFound)
|
||||
return mux
|
||||
}
|
||||
|
||||
func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// TODO: set read/write timeout?
|
||||
|
||||
timeout := h.Timeout
|
||||
if timeout == 0 {
|
||||
timeout = DefaultTimeout
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
switch {
|
||||
case strings.HasPrefix(r.URL.Path, "/raft"):
|
||||
h.serveRaft(ctx, w, r)
|
||||
case strings.HasPrefix(r.URL.Path, keysPrefix):
|
||||
h.serveKeys(ctx, w, r)
|
||||
case strings.HasPrefix(r.URL.Path, machinesPrefix):
|
||||
h.serveMachines(w, r)
|
||||
default:
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
// serverHandler provides http.Handlers for etcd client and raft communication.
|
||||
type serverHandler struct {
|
||||
timeout time.Duration
|
||||
server etcdserver.Server
|
||||
peers Peers
|
||||
}
|
||||
|
||||
func (h Handler) serveKeys(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
|
||||
if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
|
||||
defer cancel()
|
||||
|
||||
rr, err := parseRequest(r, genID())
|
||||
if err != nil {
|
||||
writeError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := h.Server.Do(ctx, rr)
|
||||
resp, err := h.server.Do(ctx, rr)
|
||||
if err != nil {
|
||||
writeError(w, err)
|
||||
return
|
||||
@ -106,18 +105,19 @@ func (h Handler) serveKeys(ctx context.Context, w http.ResponseWriter, r *http.R
|
||||
|
||||
// serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
|
||||
// TODO: rethink the format of machine list because it is not json format.
|
||||
func (h Handler) serveMachines(w http.ResponseWriter, r *http.Request) {
|
||||
func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
|
||||
if !allowMethod(w, r.Method, "GET", "HEAD") {
|
||||
return
|
||||
}
|
||||
endpoints := h.Peers.Endpoints()
|
||||
endpoints := h.peers.Endpoints()
|
||||
w.Write([]byte(strings.Join(endpoints, ", ")))
|
||||
}
|
||||
|
||||
func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
|
||||
if !allowMethod(w, r.Method, "POST") {
|
||||
return
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
log.Println("etcdhttp: error reading raft message:", err)
|
||||
@ -127,7 +127,7 @@ func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.R
|
||||
log.Println("etcdhttp: error unmarshaling raft message:", err)
|
||||
}
|
||||
log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
|
||||
if err := h.Server.Process(ctx, m); err != nil {
|
||||
if err := h.server.Process(context.TODO(), m); err != nil {
|
||||
log.Println("etcdhttp: error processing raft message:", err)
|
||||
writeError(w, err)
|
||||
return
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
@ -587,8 +588,8 @@ func TestV2MachinesEndpoint(t *testing.T) {
|
||||
{"POST", http.StatusMethodNotAllowed},
|
||||
}
|
||||
|
||||
h := Handler{Peers: Peers{}}
|
||||
s := httptest.NewServer(h)
|
||||
m := NewHandler(nil, Peers{}, time.Hour)
|
||||
s := httptest.NewServer(m)
|
||||
defer s.Close()
|
||||
|
||||
for _, tt := range tests {
|
||||
@ -610,13 +611,13 @@ func TestV2MachinesEndpoint(t *testing.T) {
|
||||
func TestServeMachines(t *testing.T) {
|
||||
peers := Peers{}
|
||||
peers.Set("0xBEEF0=localhost:8080&0xBEEF1=localhost:8081&0xBEEF2=localhost:8082")
|
||||
h := Handler{Peers: peers}
|
||||
|
||||
writer := httptest.NewRecorder()
|
||||
req, err := http.NewRequest("GET", "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
h := &serverHandler{peers: peers}
|
||||
h.serveMachines(writer, req)
|
||||
w := "http://localhost:8080, http://localhost:8081, http://localhost:8082"
|
||||
if g := writer.Body.String(); g != w {
|
||||
|
@ -36,11 +36,7 @@ func TestSet(t *testing.T) {
|
||||
srv.Start()
|
||||
defer srv.Stop()
|
||||
|
||||
h := etcdhttp.Handler{
|
||||
Timeout: time.Hour,
|
||||
Server: srv,
|
||||
}
|
||||
|
||||
h := etcdhttp.NewHandler(srv, nil, time.Hour)
|
||||
s := httptest.NewServer(h)
|
||||
defer s.Close()
|
||||
|
||||
@ -50,7 +46,7 @@ func TestSet(t *testing.T) {
|
||||
}
|
||||
|
||||
if resp.StatusCode != 201 {
|
||||
t.Errorf("StatusCode = %d, expected %d", 201, resp.StatusCode)
|
||||
t.Errorf("StatusCode = %d, expected %d", resp.StatusCode, 201)
|
||||
}
|
||||
|
||||
g := new(store.Event)
|
||||
|
9
main.go
9
main.go
@ -83,14 +83,7 @@ func startEtcd() http.Handler {
|
||||
Ticker: time.Tick(100 * time.Millisecond),
|
||||
}
|
||||
s.Start()
|
||||
|
||||
h := etcdhttp.Handler{
|
||||
Timeout: *timeout,
|
||||
Server: s,
|
||||
Peers: *peers,
|
||||
}
|
||||
|
||||
return &h
|
||||
return etcdhttp.NewHandler(s, *peers, *timeout)
|
||||
}
|
||||
|
||||
// startRaft starts a raft node from the given wal dir.
|
||||
|
Loading…
x
Reference in New Issue
Block a user