mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #1335 from unihorn/174
etcdserver/etcdhttp: store location adjustment
This commit is contained in:
commit
f26bb6ad44
@ -77,7 +77,7 @@ func (s *clusterStore) Add(m Member) {
|
||||
func (s *clusterStore) Get() Cluster {
|
||||
c := NewCluster()
|
||||
c.id = s.id
|
||||
e, err := s.Store.Get(membersKVPrefix, true, true)
|
||||
e, err := s.Store.Get(storeMembersPrefix, true, true)
|
||||
if err != nil {
|
||||
if v, ok := err.(*etcdErr.Error); ok && v.ErrorCode == etcdErr.EcodeKeyNotFound {
|
||||
return *c
|
||||
|
@ -17,6 +17,7 @@
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
@ -34,7 +35,7 @@ func TestClusterStoreAdd(t *testing.T) {
|
||||
{
|
||||
name: "Create",
|
||||
params: []interface{}{
|
||||
membersKVPrefix + "1/raftAttributes",
|
||||
path.Join(storeMembersPrefix, "1", "raftAttributes"),
|
||||
false,
|
||||
`{"PeerURLs":null}`,
|
||||
false,
|
||||
@ -44,7 +45,7 @@ func TestClusterStoreAdd(t *testing.T) {
|
||||
{
|
||||
name: "Create",
|
||||
params: []interface{}{
|
||||
membersKVPrefix + "1/attributes",
|
||||
path.Join(storeMembersPrefix, "1", "attributes"),
|
||||
false,
|
||||
`{"Name":"node1","ClientURLs":null}`,
|
||||
false,
|
||||
@ -113,7 +114,7 @@ func TestClusterStoreDelete(t *testing.T) {
|
||||
cs.Add(newTestMember(1, nil, "node1", nil))
|
||||
cs.Remove(1)
|
||||
|
||||
wdeletes := []string{membersKVPrefix + "1"}
|
||||
wdeletes := []string{path.Join(storeMembersPrefix, "1")}
|
||||
if !reflect.DeepEqual(st.deletes, wdeletes) {
|
||||
t.Errorf("deletes = %v, want %v", st.deletes, wdeletes)
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@ -107,7 +108,7 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
|
||||
defer cancel()
|
||||
|
||||
rr, err := parseRequest(r, etcdserver.GenID(), clockwork.NewRealClock())
|
||||
rr, err := parseKeyRequest(r, etcdserver.GenID(), clockwork.NewRealClock())
|
||||
if err != nil {
|
||||
writeError(w, err)
|
||||
return
|
||||
@ -121,14 +122,14 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
switch {
|
||||
case resp.Event != nil:
|
||||
if err := writeEvent(w, resp.Event, h.timer); err != nil {
|
||||
if err := writeKeyEvent(w, resp.Event, h.timer); err != nil {
|
||||
// Should never be reached
|
||||
log.Printf("error writing event: %v", err)
|
||||
}
|
||||
case resp.Watcher != nil:
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
|
||||
defer cancel()
|
||||
handleWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
|
||||
handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
|
||||
default:
|
||||
writeError(w, errors.New("received response with no Event/Watcher!"))
|
||||
}
|
||||
@ -248,10 +249,10 @@ func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// parseRequest converts a received http.Request to a server Request,
|
||||
// performing validation of supplied fields as appropriate.
|
||||
// parseKeyRequest converts a received http.Request on keysPrefix to
|
||||
// a server Request, performing validation of supplied fields as appropriate.
|
||||
// If any validation fails, an empty Request and non-nil error is returned.
|
||||
func parseRequest(r *http.Request, id uint64, clock clockwork.Clock) (etcdserverpb.Request, error) {
|
||||
func parseKeyRequest(r *http.Request, id uint64, clock clockwork.Clock) (etcdserverpb.Request, error) {
|
||||
emptyReq := etcdserverpb.Request{}
|
||||
|
||||
err := r.ParseForm()
|
||||
@ -268,7 +269,7 @@ func parseRequest(r *http.Request, id uint64, clock clockwork.Clock) (etcdserver
|
||||
"incorrect key prefix",
|
||||
)
|
||||
}
|
||||
p := r.URL.Path[len(keysPrefix):]
|
||||
p := path.Join(etcdserver.StoreKeysPrefix, r.URL.Path[len(keysPrefix):])
|
||||
|
||||
var pIdx, wIdx uint64
|
||||
if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
|
||||
@ -425,10 +426,10 @@ func writeError(w http.ResponseWriter, err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// writeEvent serializes a single Event and writes the resulting
|
||||
// JSON to the given ResponseWriter, along with the appropriate
|
||||
// headers
|
||||
func writeEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
|
||||
// writeKeyEvent trims the prefix of key path in a single Event under
|
||||
// StoreKeysPrefix, serializes it and writes the resulting JSON to the given
|
||||
// ResponseWriter, along with the appropriate headers.
|
||||
func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
|
||||
if ev == nil {
|
||||
return errors.New("cannot write empty Event!")
|
||||
}
|
||||
@ -441,10 +442,11 @@ func writeEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer)
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
}
|
||||
|
||||
ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
|
||||
return json.NewEncoder(w).Encode(ev)
|
||||
}
|
||||
|
||||
func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
|
||||
func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
|
||||
defer wa.Remove()
|
||||
ech := wa.EventChan()
|
||||
var nch <-chan bool
|
||||
@ -476,6 +478,7 @@ func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, s
|
||||
// send to the client in time. Then we simply end streaming.
|
||||
return
|
||||
}
|
||||
ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
|
||||
if err := json.NewEncoder(w).Encode(ev); err != nil {
|
||||
// Should never be reached
|
||||
log.Printf("error writing event: %v\n", err)
|
||||
@ -502,3 +505,23 @@ func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
|
||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||
return false
|
||||
}
|
||||
|
||||
func trimEventPrefix(ev *store.Event, prefix string) *store.Event {
|
||||
if ev == nil {
|
||||
return nil
|
||||
}
|
||||
ev.Node = trimNodeExternPrefix(ev.Node, prefix)
|
||||
ev.PrevNode = trimNodeExternPrefix(ev.PrevNode, prefix)
|
||||
return ev
|
||||
}
|
||||
|
||||
func trimNodeExternPrefix(n *store.NodeExtern, prefix string) *store.NodeExtern {
|
||||
if n == nil {
|
||||
return nil
|
||||
}
|
||||
n.Key = strings.TrimPrefix(n.Key, prefix)
|
||||
for _, nn := range n.Nodes {
|
||||
nn = trimNodeExternPrefix(nn, prefix)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
@ -211,7 +211,7 @@ func TestBadParseRequest(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
got, err := parseRequest(tt.in, 1234, clockwork.NewFakeClock())
|
||||
got, err := parseKeyRequest(tt.in, 1234, clockwork.NewFakeClock())
|
||||
if err == nil {
|
||||
t.Errorf("#%d: unexpected nil error!", i)
|
||||
continue
|
||||
@ -244,7 +244,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
etcdserverpb.Request{
|
||||
ID: 1234,
|
||||
Method: "GET",
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -258,7 +258,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
ID: 1234,
|
||||
Method: "PUT",
|
||||
Val: "some_value",
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -272,7 +272,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
ID: 1234,
|
||||
Method: "PUT",
|
||||
PrevIndex: 98765,
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -286,7 +286,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
ID: 1234,
|
||||
Method: "PUT",
|
||||
Recursive: true,
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -300,7 +300,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
ID: 1234,
|
||||
Method: "PUT",
|
||||
Sorted: true,
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -310,7 +310,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
ID: 1234,
|
||||
Method: "GET",
|
||||
Wait: true,
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -319,7 +319,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
etcdserverpb.Request{
|
||||
ID: 1234,
|
||||
Method: "GET",
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
Expiration: 0,
|
||||
},
|
||||
},
|
||||
@ -329,7 +329,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
etcdserverpb.Request{
|
||||
ID: 1234,
|
||||
Method: "GET",
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
Expiration: fc.Now().Add(5678 * time.Second).UnixNano(),
|
||||
},
|
||||
},
|
||||
@ -339,7 +339,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
etcdserverpb.Request{
|
||||
ID: 1234,
|
||||
Method: "GET",
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
Expiration: fc.Now().UnixNano(),
|
||||
},
|
||||
},
|
||||
@ -350,7 +350,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
ID: 1234,
|
||||
Method: "GET",
|
||||
Dir: true,
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -360,7 +360,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
ID: 1234,
|
||||
Method: "GET",
|
||||
Dir: false,
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -374,7 +374,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
ID: 1234,
|
||||
Method: "PUT",
|
||||
PrevExist: boolp(true),
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -388,7 +388,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
ID: 1234,
|
||||
Method: "PUT",
|
||||
PrevExist: boolp(false),
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
},
|
||||
},
|
||||
// mix various fields
|
||||
@ -408,7 +408,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
PrevExist: boolp(true),
|
||||
PrevValue: "previous value",
|
||||
Val: "some value",
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
},
|
||||
},
|
||||
// query parameters should be used if given
|
||||
@ -422,7 +422,7 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
ID: 1234,
|
||||
Method: "PUT",
|
||||
PrevValue: "woof",
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
},
|
||||
},
|
||||
// but form values should take precedence over query parameters
|
||||
@ -438,13 +438,13 @@ func TestGoodParseRequest(t *testing.T) {
|
||||
ID: 1234,
|
||||
Method: "PUT",
|
||||
PrevValue: "miaow",
|
||||
Path: "/foo",
|
||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
got, err := parseRequest(tt.in, 1234, fc)
|
||||
got, err := parseKeyRequest(tt.in, 1234, fc)
|
||||
if err != nil {
|
||||
t.Errorf("#%d: err = %v, want %v", i, err, nil)
|
||||
}
|
||||
@ -526,7 +526,7 @@ func (drt dummyRaftTimer) Term() uint64 { return uint64(5) }
|
||||
func TestWriteEvent(t *testing.T) {
|
||||
// nil event should not panic
|
||||
rw := httptest.NewRecorder()
|
||||
writeEvent(rw, nil, dummyRaftTimer{})
|
||||
writeKeyEvent(rw, nil, dummyRaftTimer{})
|
||||
h := rw.Header()
|
||||
if len(h) > 0 {
|
||||
t.Fatalf("unexpected non-empty headers: %#v", h)
|
||||
@ -569,7 +569,7 @@ func TestWriteEvent(t *testing.T) {
|
||||
|
||||
for i, tt := range tests {
|
||||
rw := httptest.NewRecorder()
|
||||
writeEvent(rw, tt.ev, dummyRaftTimer{})
|
||||
writeKeyEvent(rw, tt.ev, dummyRaftTimer{})
|
||||
if gct := rw.Header().Get("Content-Type"); gct != "application/json" {
|
||||
t.Errorf("case %d: bad Content-Type: got %q, want application/json", i, gct)
|
||||
}
|
||||
@ -1240,7 +1240,7 @@ func TestHandleWatch(t *testing.T) {
|
||||
}
|
||||
tt.doToChan(wa.echan)
|
||||
|
||||
handleWatch(tt.getCtx(), rw, wa, false, dummyRaftTimer{})
|
||||
handleKeyWatch(tt.getCtx(), rw, wa, false, dummyRaftTimer{})
|
||||
|
||||
wcode := http.StatusOK
|
||||
wct := "application/json"
|
||||
@ -1295,7 +1295,7 @@ func TestHandleWatchStreaming(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
handleWatch(ctx, rw, wa, true, dummyRaftTimer{})
|
||||
handleKeyWatch(ctx, rw, wa, true, dummyRaftTimer{})
|
||||
close(done)
|
||||
}()
|
||||
|
||||
@ -1561,6 +1561,86 @@ func TestServeAdminMembersDelete(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrimEventPrefix(t *testing.T) {
|
||||
pre := "/abc"
|
||||
tests := []struct {
|
||||
ev *store.Event
|
||||
wev *store.Event
|
||||
}{
|
||||
{
|
||||
nil,
|
||||
nil,
|
||||
},
|
||||
{
|
||||
&store.Event{},
|
||||
&store.Event{},
|
||||
},
|
||||
{
|
||||
&store.Event{Node: &store.NodeExtern{Key: "/abc/def"}},
|
||||
&store.Event{Node: &store.NodeExtern{Key: "/def"}},
|
||||
},
|
||||
{
|
||||
&store.Event{PrevNode: &store.NodeExtern{Key: "/abc/ghi"}},
|
||||
&store.Event{PrevNode: &store.NodeExtern{Key: "/ghi"}},
|
||||
},
|
||||
{
|
||||
&store.Event{
|
||||
Node: &store.NodeExtern{Key: "/abc/def"},
|
||||
PrevNode: &store.NodeExtern{Key: "/abc/ghi"},
|
||||
},
|
||||
&store.Event{
|
||||
Node: &store.NodeExtern{Key: "/def"},
|
||||
PrevNode: &store.NodeExtern{Key: "/ghi"},
|
||||
},
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
ev := trimEventPrefix(tt.ev, pre)
|
||||
if !reflect.DeepEqual(ev, tt.wev) {
|
||||
t.Errorf("#%d: event = %+v, want %+v", i, ev, tt.wev)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrimNodeExternPrefix(t *testing.T) {
|
||||
pre := "/abc"
|
||||
tests := []struct {
|
||||
n *store.NodeExtern
|
||||
wn *store.NodeExtern
|
||||
}{
|
||||
{
|
||||
nil,
|
||||
nil,
|
||||
},
|
||||
{
|
||||
&store.NodeExtern{Key: "/abc/def"},
|
||||
&store.NodeExtern{Key: "/def"},
|
||||
},
|
||||
{
|
||||
&store.NodeExtern{
|
||||
Key: "/abc/def",
|
||||
Nodes: []*store.NodeExtern{
|
||||
{Key: "/abc/def/1"},
|
||||
{Key: "/abc/def/2"},
|
||||
},
|
||||
},
|
||||
&store.NodeExtern{
|
||||
Key: "/def",
|
||||
Nodes: []*store.NodeExtern{
|
||||
{Key: "/def/1"},
|
||||
{Key: "/def/2"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
n := trimNodeExternPrefix(tt.n, pre)
|
||||
if !reflect.DeepEqual(n, tt.wn) {
|
||||
t.Errorf("#%d: node = %+v, want %+v", i, n, tt.wn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type fakeCluster struct {
|
||||
members []etcdserver.Member
|
||||
}
|
||||
|
@ -28,8 +28,6 @@ import (
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
)
|
||||
|
||||
const membersKVPrefix = "/_etcd/members/"
|
||||
|
||||
// RaftAttributes represents the raft related attributes of an etcd member.
|
||||
type RaftAttributes struct {
|
||||
// TODO(philips): ensure these are URLs
|
||||
@ -71,7 +69,7 @@ func newMember(name string, peerURLs types.URLs, now *time.Time) *Member {
|
||||
}
|
||||
|
||||
func (m Member) storeKey() string {
|
||||
return path.Join(membersKVPrefix, idAsHex(m.ID))
|
||||
return path.Join(storeMembersPrefix, idAsHex(m.ID))
|
||||
}
|
||||
|
||||
func parseMemberID(key string) uint64 {
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -47,6 +48,9 @@ const (
|
||||
DefaultSnapCount = 10000
|
||||
// TODO: calculate based on heartbeat interval
|
||||
defaultPublishRetryInterval = 5 * time.Second
|
||||
|
||||
StoreAdminPrefix = "/0"
|
||||
StoreKeysPrefix = "/1"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -55,6 +59,8 @@ var (
|
||||
ErrIDRemoved = errors.New("etcdserver: ID removed")
|
||||
ErrIDExists = errors.New("etcdserver: ID exists")
|
||||
ErrIDNotFound = errors.New("etcdserver: ID not found")
|
||||
|
||||
storeMembersPrefix = path.Join(StoreAdminPrefix, "members")
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
Loading…
x
Reference in New Issue
Block a user