mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver/etcdhttp: store location adjustment
Detailed adjustment: /_etcd/machines/* -> /0/members/* /* -> /1/* And it keeps key path returned to user the same as before.
This commit is contained in:
parent
0398a31b16
commit
2ff3cac653
@ -77,7 +77,7 @@ func (s *clusterStore) Add(m Member) {
|
|||||||
func (s *clusterStore) Get() Cluster {
|
func (s *clusterStore) Get() Cluster {
|
||||||
c := NewCluster()
|
c := NewCluster()
|
||||||
c.id = s.id
|
c.id = s.id
|
||||||
e, err := s.Store.Get(membersKVPrefix, true, true)
|
e, err := s.Store.Get(membersDir, true, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if v, ok := err.(*etcdErr.Error); ok && v.ErrorCode == etcdErr.EcodeKeyNotFound {
|
if v, ok := err.(*etcdErr.Error); ok && v.ErrorCode == etcdErr.EcodeKeyNotFound {
|
||||||
return *c
|
return *c
|
||||||
|
@ -34,7 +34,7 @@ func TestClusterStoreAdd(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "Create",
|
name: "Create",
|
||||||
params: []interface{}{
|
params: []interface{}{
|
||||||
membersKVPrefix + "1/raftAttributes",
|
membersDir + "/1/raftAttributes",
|
||||||
false,
|
false,
|
||||||
`{"PeerURLs":null}`,
|
`{"PeerURLs":null}`,
|
||||||
false,
|
false,
|
||||||
@ -44,7 +44,7 @@ func TestClusterStoreAdd(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "Create",
|
name: "Create",
|
||||||
params: []interface{}{
|
params: []interface{}{
|
||||||
membersKVPrefix + "1/attributes",
|
membersDir + "/1/attributes",
|
||||||
false,
|
false,
|
||||||
`{"Name":"node1","ClientURLs":null}`,
|
`{"Name":"node1","ClientURLs":null}`,
|
||||||
false,
|
false,
|
||||||
@ -113,7 +113,7 @@ func TestClusterStoreDelete(t *testing.T) {
|
|||||||
cs.Add(newTestMember(1, nil, "node1", nil))
|
cs.Add(newTestMember(1, nil, "node1", nil))
|
||||||
cs.Remove(1)
|
cs.Remove(1)
|
||||||
|
|
||||||
wdeletes := []string{membersKVPrefix + "1"}
|
wdeletes := []string{membersDir + "/1"}
|
||||||
if !reflect.DeepEqual(st.deletes, wdeletes) {
|
if !reflect.DeepEqual(st.deletes, wdeletes) {
|
||||||
t.Errorf("deletes = %v, want %v", st.deletes, wdeletes)
|
t.Errorf("deletes = %v, want %v", st.deletes, wdeletes)
|
||||||
}
|
}
|
||||||
|
@ -113,6 +113,7 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rr.Path = etcdserver.KeysDir + rr.Path
|
||||||
resp, err := h.server.Do(ctx, rr)
|
resp, err := h.server.Do(ctx, rr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeError(w, err)
|
writeError(w, err)
|
||||||
@ -121,14 +122,15 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
switch {
|
switch {
|
||||||
case resp.Event != nil:
|
case resp.Event != nil:
|
||||||
if err := writeEvent(w, resp.Event, h.timer); err != nil {
|
ev := trimEventPrefix(resp.Event, etcdserver.KeysDir)
|
||||||
|
if err := writeEvent(w, ev, h.timer); err != nil {
|
||||||
// Should never be reached
|
// Should never be reached
|
||||||
log.Printf("error writing event: %v", err)
|
log.Printf("error writing event: %v", err)
|
||||||
}
|
}
|
||||||
case resp.Watcher != nil:
|
case resp.Watcher != nil:
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
handleWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
|
handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
|
||||||
default:
|
default:
|
||||||
writeError(w, errors.New("received response with no Event/Watcher!"))
|
writeError(w, errors.New("received response with no Event/Watcher!"))
|
||||||
}
|
}
|
||||||
@ -444,7 +446,7 @@ func writeEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer)
|
|||||||
return json.NewEncoder(w).Encode(ev)
|
return json.NewEncoder(w).Encode(ev)
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
|
func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
|
||||||
defer wa.Remove()
|
defer wa.Remove()
|
||||||
ech := wa.EventChan()
|
ech := wa.EventChan()
|
||||||
var nch <-chan bool
|
var nch <-chan bool
|
||||||
@ -476,6 +478,7 @@ func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, s
|
|||||||
// send to the client in time. Then we simply end streaming.
|
// send to the client in time. Then we simply end streaming.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
ev = trimEventPrefix(ev, etcdserver.KeysDir)
|
||||||
if err := json.NewEncoder(w).Encode(ev); err != nil {
|
if err := json.NewEncoder(w).Encode(ev); err != nil {
|
||||||
// Should never be reached
|
// Should never be reached
|
||||||
log.Printf("error writing event: %v\n", err)
|
log.Printf("error writing event: %v\n", err)
|
||||||
@ -502,3 +505,23 @@ func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
|
|||||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func trimEventPrefix(ev *store.Event, pre string) *store.Event {
|
||||||
|
if ev == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
ev.Node = trimNodeExternPrefix(ev.Node, pre)
|
||||||
|
ev.PrevNode = trimNodeExternPrefix(ev.PrevNode, pre)
|
||||||
|
return ev
|
||||||
|
}
|
||||||
|
|
||||||
|
func trimNodeExternPrefix(n *store.NodeExtern, pre string) *store.NodeExtern {
|
||||||
|
if n == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
n.Key = strings.TrimPrefix(n.Key, pre)
|
||||||
|
for _, nn := range n.Nodes {
|
||||||
|
nn = trimNodeExternPrefix(nn, pre)
|
||||||
|
}
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
@ -1240,7 +1240,7 @@ func TestHandleWatch(t *testing.T) {
|
|||||||
}
|
}
|
||||||
tt.doToChan(wa.echan)
|
tt.doToChan(wa.echan)
|
||||||
|
|
||||||
handleWatch(tt.getCtx(), rw, wa, false, dummyRaftTimer{})
|
handleKeyWatch(tt.getCtx(), rw, wa, false, dummyRaftTimer{})
|
||||||
|
|
||||||
wcode := http.StatusOK
|
wcode := http.StatusOK
|
||||||
wct := "application/json"
|
wct := "application/json"
|
||||||
@ -1295,7 +1295,7 @@ func TestHandleWatchStreaming(t *testing.T) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
handleWatch(ctx, rw, wa, true, dummyRaftTimer{})
|
handleKeyWatch(ctx, rw, wa, true, dummyRaftTimer{})
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -1561,6 +1561,86 @@ func TestServeAdminMembersDelete(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTrimEventPrefix(t *testing.T) {
|
||||||
|
pre := "/abc"
|
||||||
|
tests := []struct {
|
||||||
|
ev *store.Event
|
||||||
|
wev *store.Event
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
nil,
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
&store.Event{},
|
||||||
|
&store.Event{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
&store.Event{Node: &store.NodeExtern{Key: "/abc/def"}},
|
||||||
|
&store.Event{Node: &store.NodeExtern{Key: "/def"}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
&store.Event{PrevNode: &store.NodeExtern{Key: "/abc/ghi"}},
|
||||||
|
&store.Event{PrevNode: &store.NodeExtern{Key: "/ghi"}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
&store.Event{
|
||||||
|
Node: &store.NodeExtern{Key: "/abc/def"},
|
||||||
|
PrevNode: &store.NodeExtern{Key: "/abc/ghi"},
|
||||||
|
},
|
||||||
|
&store.Event{
|
||||||
|
Node: &store.NodeExtern{Key: "/def"},
|
||||||
|
PrevNode: &store.NodeExtern{Key: "/ghi"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for i, tt := range tests {
|
||||||
|
ev := trimEventPrefix(tt.ev, pre)
|
||||||
|
if !reflect.DeepEqual(ev, tt.wev) {
|
||||||
|
t.Errorf("#%d: event = %+v, want %+v", i, ev, tt.wev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTrimNodeExternPrefix(t *testing.T) {
|
||||||
|
pre := "/abc"
|
||||||
|
tests := []struct {
|
||||||
|
n *store.NodeExtern
|
||||||
|
wn *store.NodeExtern
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
nil,
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
&store.NodeExtern{Key: "/abc/def"},
|
||||||
|
&store.NodeExtern{Key: "/def"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
&store.NodeExtern{
|
||||||
|
Key: "/abc/def",
|
||||||
|
Nodes: []*store.NodeExtern{
|
||||||
|
{Key: "/abc/def/1"},
|
||||||
|
{Key: "/abc/def/2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&store.NodeExtern{
|
||||||
|
Key: "/def",
|
||||||
|
Nodes: []*store.NodeExtern{
|
||||||
|
{Key: "/def/1"},
|
||||||
|
{Key: "/def/2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for i, tt := range tests {
|
||||||
|
n := trimNodeExternPrefix(tt.n, pre)
|
||||||
|
if !reflect.DeepEqual(n, tt.wn) {
|
||||||
|
t.Errorf("#%d: node = %+v, want %+v", i, n, tt.wn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type fakeCluster struct {
|
type fakeCluster struct {
|
||||||
members []etcdserver.Member
|
members []etcdserver.Member
|
||||||
}
|
}
|
||||||
|
@ -28,8 +28,6 @@ import (
|
|||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
const membersKVPrefix = "/_etcd/members/"
|
|
||||||
|
|
||||||
// RaftAttributes represents the raft related attributes of an etcd member.
|
// RaftAttributes represents the raft related attributes of an etcd member.
|
||||||
type RaftAttributes struct {
|
type RaftAttributes struct {
|
||||||
// TODO(philips): ensure these are URLs
|
// TODO(philips): ensure these are URLs
|
||||||
@ -71,7 +69,7 @@ func newMember(name string, peerURLs types.URLs, now *time.Time) *Member {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m Member) storeKey() string {
|
func (m Member) storeKey() string {
|
||||||
return path.Join(membersKVPrefix, idAsHex(m.ID))
|
return path.Join(membersDir, idAsHex(m.ID))
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseMemberID(key string) uint64 {
|
func parseMemberID(key string) uint64 {
|
||||||
|
@ -47,6 +47,10 @@ const (
|
|||||||
DefaultSnapCount = 10000
|
DefaultSnapCount = 10000
|
||||||
// TODO: calculate based on heartbeat interval
|
// TODO: calculate based on heartbeat interval
|
||||||
defaultPublishRetryInterval = 5 * time.Second
|
defaultPublishRetryInterval = 5 * time.Second
|
||||||
|
|
||||||
|
AdminDir = "/0"
|
||||||
|
membersDir = AdminDir + "/members"
|
||||||
|
KeysDir = "/1"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
Loading…
x
Reference in New Issue
Block a user