Merge pull request #2043 from xiang90/leader_member

etcdhttp: support member/leader endpoint
This commit is contained in:
Xiang Li 2015-01-06 09:01:58 -08:00
commit cbdb0266e9
4 changed files with 86 additions and 16 deletions

View File

@ -159,14 +159,26 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
if trimPrefix(r.URL.Path, membersPrefix) != "" {
switch trimPrefix(r.URL.Path, membersPrefix) {
case "":
mc := newMemberCollection(h.clusterInfo.Members())
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(mc); err != nil {
log.Printf("etcdhttp: %v", err)
}
case "leader":
id := h.server.Leader()
if id == 0 {
writeError(w, httptypes.NewHTTPError(http.StatusServiceUnavailable, "During election"))
return
}
m := newMember(h.clusterInfo.Member(id))
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(m); err != nil {
log.Printf("etcdhttp: %v", err)
}
default:
writeError(w, httptypes.NewHTTPError(http.StatusNotFound, "Not found"))
return
}
mc := newMemberCollection(h.clusterInfo.Members())
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(mc); err != nil {
log.Printf("etcdhttp: %v", err)
}
case "POST":
req := httptypes.MemberCreateRequest{}

View File

@ -92,9 +92,10 @@ type serverRecorder struct {
actions []action
}
func (s *serverRecorder) Start() {}
func (s *serverRecorder) Stop() {}
func (s *serverRecorder) ID() types.ID { return types.ID(1) }
func (s *serverRecorder) Start() {}
func (s *serverRecorder) Stop() {}
func (s *serverRecorder) Leader() types.ID { return types.ID(1) }
func (s *serverRecorder) ID() types.ID { return types.ID(1) }
func (s *serverRecorder) Do(_ context.Context, r etcdserverpb.Request) (etcdserver.Response, error) {
s.actions = append(s.actions, action{name: "Do", params: []interface{}{r}})
return etcdserver.Response{}, nil
@ -139,9 +140,10 @@ type resServer struct {
res etcdserver.Response
}
func (rs *resServer) Start() {}
func (rs *resServer) Stop() {}
func (rs *resServer) ID() types.ID { return types.ID(1) }
func (rs *resServer) Start() {}
func (rs *resServer) Stop() {}
func (rs *resServer) ID() types.ID { return types.ID(1) }
func (rs *resServer) Leader() types.ID { return types.ID(1) }
func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) {
return rs.res, nil
}
@ -604,6 +606,57 @@ func TestServeMembers(t *testing.T) {
}
}
// TODO: consolidate **ALL** fake server implementations and add no leader test case.
func TestServeLeader(t *testing.T) {
memb1 := etcdserver.Member{ID: 1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}}
memb2 := etcdserver.Member{ID: 2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}}
cluster := &fakeCluster{
id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
}
h := &membersHandler{
server: &serverRecorder{},
clock: clockwork.NewFakeClock(),
clusterInfo: cluster,
}
wmc := string(`{"id":"1","name":"","peerURLs":[],"clientURLs":["http://localhost:8080"]}`)
tests := []struct {
path string
wcode int
wct string
wbody string
}{
{membersPrefix + "leader", http.StatusOK, "application/json", wmc + "\n"},
// TODO: add no leader case
}
for i, tt := range tests {
req, err := http.NewRequest("GET", mustNewURL(t, tt.path).String(), nil)
if err != nil {
t.Fatal(err)
}
rw := httptest.NewRecorder()
h.ServeHTTP(rw, req)
if rw.Code != tt.wcode {
t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
}
if gct := rw.Header().Get("Content-Type"); gct != tt.wct {
t.Errorf("#%d: content-type = %s, want %s", i, gct, tt.wct)
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := cluster.ID().String()
if gcid != wcid {
t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid)
}
if rw.Body.String() != tt.wbody {
t.Errorf("#%d: body = %q, want %q", i, rw.Body.String(), tt.wbody)
}
}
}
func TestServeMembersCreate(t *testing.T) {
u := mustNewURL(t, membersPrefix)
b := []byte(`{"peerURLs":["http://127.0.0.1:1"]}`)

View File

@ -65,9 +65,10 @@ type errServer struct {
err error
}
func (fs *errServer) Start() {}
func (fs *errServer) Stop() {}
func (fs *errServer) ID() types.ID { return types.ID(1) }
func (fs *errServer) Start() {}
func (fs *errServer) Stop() {}
func (fs *errServer) ID() types.ID { return types.ID(1) }
func (fs *errServer) Leader() types.ID { return types.ID(1) }
func (fs *errServer) Do(ctx context.Context, r etcdserverpb.Request) (etcdserver.Response, error) {
return etcdserver.Response{}, fs.err
}

View File

@ -93,6 +93,8 @@ type Server interface {
Stop()
// ID returns the ID of the Server.
ID() types.ID
// Leader returns the ID of the leader Server.
Leader() types.ID
// Do takes a request and attempts to fulfill it, returning a Response.
Do(ctx context.Context, r pb.Request) (Response, error)
// Process takes a raft message and applies it to the server's raft state
@ -579,6 +581,8 @@ func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.raftTerm) }
// Index, Term, Lead, Committed, Applied, LastIndex, etc.
func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.raftLead) }
func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
// configure sends a configuration change through consensus and
// then waits for it to be applied to the server. It
// will block until the change is performed or there is an error.