From 77271b066347310859eb05bcf9d03ef52fac4942 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 2 Oct 2014 15:48:58 -0700 Subject: [PATCH] etcdserver: split Member into RaftAttributes and Attributes The split helps to save them in different key paths, because they have distinct life cycle on update. --- etcdserver/cluster_store.go | 57 +++++++++--- etcdserver/cluster_store_test.go | 151 +++++++++++++++++++++++-------- etcdserver/cluster_test.go | 74 +++++++-------- etcdserver/etcdhttp/http_test.go | 6 +- etcdserver/member.go | 32 ++++++- etcdserver/server.go | 22 ++--- etcdserver/server_test.go | 59 +++++------- 7 files changed, 253 insertions(+), 148 deletions(-) diff --git a/etcdserver/cluster_store.go b/etcdserver/cluster_store.go index cfad522b6..14a17ae60 100644 --- a/etcdserver/cluster_store.go +++ b/etcdserver/cluster_store.go @@ -7,12 +7,16 @@ import ( "log" "net/http" + etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/store" ) const ( raftPrefix = "/raft" + + raftAttributesSuffix = "/raftAttributes" + attributesSuffix = "/attributes" ) type ClusterStore interface { @@ -36,13 +40,20 @@ func NewClusterStore(st store.Store, c Cluster) ClusterStore { // Add puts a new Member into the store. // A Member with a matching id must not exist. func (s *clusterStore) Add(m Member) { - b, err := json.Marshal(m) + b, err := json.Marshal(m.RaftAttributes) if err != nil { - log.Panicf("marshal peer info error: %v", err) + log.Panicf("marshal error: %v", err) + } + if _, err := s.Store.Create(m.storeKey()+raftAttributesSuffix, false, string(b), false, store.Permanent); err != nil { + log.Panicf("add raftAttributes should never fail: %v", err) } - if _, err := s.Store.Create(m.storeKey(), false, string(b), false, store.Permanent); err != nil { - log.Panicf("add member should never fail: %v", err) + b, err = json.Marshal(m.Attributes) + if err != nil { + log.Panicf("marshal error: %v", err) + } + if _, err := s.Store.Create(m.storeKey()+attributesSuffix, false, string(b), false, store.Permanent); err != nil { + log.Panicf("add attributes should never fail: %v", err) } } @@ -50,28 +61,52 @@ func (s *clusterStore) Add(m Member) { // lock here. func (s *clusterStore) Get() Cluster { c := &Cluster{} - e, err := s.Store.Get(machineKVPrefix, true, false) + e, err := s.Store.Get(machineKVPrefix, true, true) if err != nil { + if v, ok := err.(*etcdErr.Error); ok && v.ErrorCode == etcdErr.EcodeKeyNotFound { + return *c + } log.Panicf("get member should never fail: %v", err) } for _, n := range e.Node.Nodes { - m := Member{} - if err := json.Unmarshal([]byte(*n.Value), &m); err != nil { - log.Panicf("unmarshal peer error: %v", err) - } - err := c.Add(m) + m, err := nodeToMember(n) if err != nil { + log.Panicf("unexpected nodeToMember error: %v", err) + } + if err := c.Add(m); err != nil { log.Panicf("add member to cluster should never fail: %v", err) } } return *c } +// nodeToMember builds member through a store node. +// the child nodes of the given node should be sorted by key. +func nodeToMember(n *store.NodeExtern) (Member, error) { + m := Member{ID: parseMemberID(n.Key)} + if len(n.Nodes) != 2 { + return m, fmt.Errorf("len(nodes) = %d, want 2", len(n.Nodes)) + } + if w := n.Key + attributesSuffix; n.Nodes[0].Key != w { + return m, fmt.Errorf("key = %v, want %v", n.Nodes[0].Key, w) + } + if err := json.Unmarshal([]byte(*n.Nodes[0].Value), &m.Attributes); err != nil { + return m, fmt.Errorf("unmarshal attributes error: %v", err) + } + if w := n.Key + raftAttributesSuffix; n.Nodes[1].Key != w { + return m, fmt.Errorf("key = %v, want %v", n.Nodes[1].Key, w) + } + if err := json.Unmarshal([]byte(*n.Nodes[1].Value), &m.RaftAttributes); err != nil { + return m, fmt.Errorf("unmarshal raftAttributes error: %v", err) + } + return m, nil +} + // Remove removes a member from the store. // The given id MUST exist. func (s *clusterStore) Remove(id uint64) { p := s.Get().FindID(id).storeKey() - if _, err := s.Store.Delete(p, false, false); err != nil { + if _, err := s.Store.Delete(p, true, true); err != nil { log.Panicf("delete peer should never fail: %v", err) } } diff --git a/etcdserver/cluster_store_test.go b/etcdserver/cluster_store_test.go index 02167bfc6..3357d38df 100644 --- a/etcdserver/cluster_store_test.go +++ b/etcdserver/cluster_store_test.go @@ -12,22 +12,32 @@ import ( func TestClusterStoreAdd(t *testing.T) { st := &storeRecorder{} ps := &clusterStore{Store: st} - ps.Add(Member{Name: "node", ID: 1}) + ps.Add(newTestMember(1, nil, "node1", nil)) wactions := []action{ { name: "Create", params: []interface{}{ - machineKVPrefix + "1", + machineKVPrefix + "1/raftAttributes", false, - `{"ID":1,"Name":"node","PeerURLs":null,"ClientURLs":null}`, + `{"PeerURLs":null}`, + false, + store.Permanent, + }, + }, + { + name: "Create", + params: []interface{}{ + machineKVPrefix + "1/attributes", + false, + `{"Name":"node1","ClientURLs":null}`, false, store.Permanent, }, }, } if g := st.Action(); !reflect.DeepEqual(g, wactions) { - t.Error("actions = %v, want %v", g, wactions) + t.Errorf("actions = %v, want %v", g, wactions) } } @@ -37,20 +47,32 @@ func TestClusterStoreGet(t *testing.T) { wmems []Member }{ { - []Member{{Name: "node1", ID: 1}}, - []Member{{Name: "node1", ID: 1}}, + []Member{newTestMember(1, nil, "node1", nil)}, + []Member{newTestMember(1, nil, "node1", nil)}, }, { []Member{}, []Member{}, }, { - []Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}}, - []Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}}, + []Member{ + newTestMember(1, nil, "node1", nil), + newTestMember(2, nil, "node2", nil), + }, + []Member{ + newTestMember(1, nil, "node1", nil), + newTestMember(2, nil, "node2", nil), + }, }, { - []Member{{Name: "node2", ID: 2}, {Name: "node1", ID: 1}}, - []Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}}, + []Member{ + newTestMember(2, nil, "node2", nil), + newTestMember(1, nil, "node1", nil), + }, + []Member{ + newTestMember(1, nil, "node1", nil), + newTestMember(2, nil, "node2", nil), + }, }, } for i, tt := range tests { @@ -60,7 +82,7 @@ func TestClusterStoreGet(t *testing.T) { t.Error(err) } - cs := NewClusterStore(&getAllStore{}, c) + cs := NewClusterStore(newGetAllStore(), c) if g := cs.Get(); !reflect.DeepEqual(g, c) { t.Errorf("#%d: mems = %v, want %v", i, g, c) @@ -69,9 +91,9 @@ func TestClusterStoreGet(t *testing.T) { } func TestClusterStoreDelete(t *testing.T) { - st := &storeGetAllDeleteRecorder{} + st := newStoreGetAllAndDeleteRecorder() c := Cluster{} - c.Add(Member{Name: "node", ID: 1}) + c.Add(newTestMember(1, nil, "node1", nil)) cs := NewClusterStore(st, c) cs.Remove(1) @@ -81,6 +103,53 @@ func TestClusterStoreDelete(t *testing.T) { } } +func TestNodeToMemberFail(t *testing.T) { + tests := []*store.NodeExtern{ + {Key: "/1234", Nodes: []*store.NodeExtern{ + {Key: "/1234/strange"}, + }}, + {Key: "/1234", Nodes: []*store.NodeExtern{ + {Key: "/1234/dynamic", Value: stringp("garbage")}, + }}, + {Key: "/1234", Nodes: []*store.NodeExtern{ + {Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)}, + }}, + {Key: "/1234", Nodes: []*store.NodeExtern{ + {Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)}, + {Key: "/1234/strange"}, + }}, + {Key: "/1234", Nodes: []*store.NodeExtern{ + {Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)}, + {Key: "/1234/static", Value: stringp("garbage")}, + }}, + {Key: "/1234", Nodes: []*store.NodeExtern{ + {Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)}, + {Key: "/1234/static", Value: stringp(`{"Name":"node1","ClientURLs":null}`)}, + {Key: "/1234/strange"}, + }}, + } + for i, tt := range tests { + if _, err := nodeToMember(tt); err == nil { + t.Errorf("#%d: unexpected nil error", i) + } + } +} + +func TestNodeToMember(t *testing.T) { + n := &store.NodeExtern{Key: "/1234", Nodes: []*store.NodeExtern{ + {Key: "/1234/attributes", Value: stringp(`{"Name":"node1","ClientURLs":null}`)}, + {Key: "/1234/raftAttributes", Value: stringp(`{"PeerURLs":null}`)}, + }} + wm := Member{ID: 0x1234, RaftAttributes: RaftAttributes{}, Attributes: Attributes{Name: "node1"}} + m, err := nodeToMember(n) + if err != nil { + t.Fatalf("unexpected nodeToMember error: %v", err) + } + if !reflect.DeepEqual(m, wm) { + t.Errorf("member = %+v, want %+v", m, wm) + } +} + // simpleStore implements basic create and get. type simpleStore struct { storeRecorder @@ -103,35 +172,41 @@ func (s *simpleStore) Get(key string, _, _ bool) (*store.Event, error) { return ev, nil } -// getAllStore inherits simpleStore, and makes Get return all keys. +// getAllStore embeds simpleStore, and makes Get return all keys sorted. +// It uses real store because it uses lots of logic in store and is not easy +// to mock. +// TODO: use mock one to do testing type getAllStore struct { - simpleStore + store.Store } -func (s *getAllStore) Get(_ string, _, _ bool) (*store.Event, error) { - nodes := make([]*store.NodeExtern, 0) - for k, v := range s.st { - nodes = append(nodes, &store.NodeExtern{Key: k, Value: stringp(v)}) +func newGetAllStore() *getAllStore { + return &getAllStore{store.New()} +} + +type storeGetAllAndDeleteRecorder struct { + *getAllStore + deletes []string +} + +func newStoreGetAllAndDeleteRecorder() *storeGetAllAndDeleteRecorder { + return &storeGetAllAndDeleteRecorder{getAllStore: newGetAllStore()} +} + +func (s *storeGetAllAndDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) { + s.deletes = append(s.deletes, key) + return nil, nil +} + +func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member { + return Member{ + ID: id, + RaftAttributes: RaftAttributes{PeerURLs: peerURLs}, + Attributes: Attributes{Name: name, ClientURLs: clientURLs}, } - return &store.Event{Node: &store.NodeExtern{Nodes: nodes}}, nil } -type storeDeleteRecorder struct { - storeRecorder - deletes []string -} - -func (s *storeDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) { - s.deletes = append(s.deletes, key) - return nil, nil -} - -type storeGetAllDeleteRecorder struct { - getAllStore - deletes []string -} - -func (s *storeGetAllDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) { - s.deletes = append(s.deletes, key) - return nil, nil +func newTestMemberp(id uint64, peerURLs []string, name string, clientURLs []string) *Member { + m := newTestMember(id, peerURLs, name, clientURLs) + return &m } diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 6bd1ae579..ee7700348 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -18,19 +18,13 @@ func TestClusterAddSlice(t *testing.T) { }, { []Member{ - {ID: 1, PeerURLs: []string{"foo", "bar"}}, - {ID: 2, PeerURLs: []string{"baz"}}, + newTestMember(1, []string{"foo", "bar"}, "", nil), + newTestMember(2, []string{"baz"}, "", nil), }, &Cluster{ - 1: &Member{ - ID: 1, - PeerURLs: []string{"foo", "bar"}, - }, - 2: &Member{ - ID: 2, - PeerURLs: []string{"baz"}, - }, + 1: newTestMemberp(1, []string{"foo", "bar"}, "", nil), + 2: newTestMemberp(2, []string{"baz"}, "", nil), }, }, } @@ -48,18 +42,18 @@ func TestClusterAddSlice(t *testing.T) { func TestClusterAddSliceBad(t *testing.T) { c := Cluster{ - 1: &Member{ID: 1}, + 1: newTestMemberp(1, nil, "", nil), } - if err := c.AddSlice([]Member{{ID: 1}}); err == nil { + if err := c.AddSlice([]Member{newTestMember(1, nil, "", nil)}); err == nil { t.Error("want err, but got nil") } } func TestClusterPick(t *testing.T) { cs := Cluster{ - 1: &Member{ID: 1, PeerURLs: []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}}, - 2: &Member{ID: 2, PeerURLs: []string{"xyz"}}, - 3: &Member{ID: 3, PeerURLs: []string{}}, + 1: newTestMemberp(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil), + 2: newTestMemberp(2, []string{"xyz"}, "", nil), + 3: newTestMemberp(3, []string{}, "", nil), } ids := map[string]bool{ "abc": true, @@ -98,7 +92,7 @@ func TestClusterFind(t *testing.T) { { 1, "node1", - []Member{{Name: "node1", ID: 1}}, + []Member{newTestMember(1, nil, "node1", nil)}, true, }, { @@ -110,13 +104,13 @@ func TestClusterFind(t *testing.T) { { 2, "node2", - []Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}}, + []Member{newTestMember(1, nil, "node1", nil), newTestMember(2, nil, "node2", nil)}, true, }, { 3, "node3", - []Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}}, + []Member{newTestMember(1, nil, "node1", nil), newTestMember(2, nil, "node2", nil)}, false, }, } @@ -161,9 +155,9 @@ func TestClusterSet(t *testing.T) { { "mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379", []Member{ - {ID: 3736794188555456841, Name: "mem1", PeerURLs: []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}}, - {ID: 5674507346857578431, Name: "mem2", PeerURLs: []string{"http://10.0.0.2:2379"}}, - {ID: 2676999861503984872, Name: "default", PeerURLs: []string{"http://127.0.0.1:2379"}}, + newTestMember(3736794188555456841, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil), + newTestMember(5674507346857578431, []string{"http://10.0.0.2:2379"}, "mem2", nil), + newTestMember(2676999861503984872, []string{"http://127.0.0.1:2379"}, "default", nil), }, }, } @@ -203,9 +197,9 @@ func TestClusterSetBad(t *testing.T) { func TestClusterIDs(t *testing.T) { cs := Cluster{} cs.AddSlice([]Member{ - {ID: 1}, - {ID: 4}, - {ID: 100}, + newTestMember(1, nil, "", nil), + newTestMember(4, nil, "", nil), + newTestMember(100, nil, "", nil), }) w := []uint64{1, 4, 100} g := cs.IDs() @@ -217,14 +211,14 @@ func TestClusterIDs(t *testing.T) { func TestClusterAddBad(t *testing.T) { // Should not be possible to add the same ID multiple times mems := []Member{ - {ID: 1, Name: "mem1"}, - {ID: 1, Name: "mem2"}, + newTestMember(1, nil, "mem1", nil), + newTestMember(1, nil, "mem2", nil), } c := &Cluster{} - c.Add(Member{ID: 1, Name: "mem1"}) + c.Add(newTestMember(1, nil, "mem1", nil)) for i, m := range mems { if err := c.Add(m); err == nil { - t.Errorf("#%d: set = %v, want err", i, m) + t.Errorf("#%d: set = %v, want err", i, err) } } } @@ -237,7 +231,7 @@ func TestClusterPeerURLs(t *testing.T) { // single peer with a single address { mems: []Member{ - {ID: 1, PeerURLs: []string{"http://192.0.2.1"}}, + newTestMember(1, []string{"http://192.0.2.1"}, "", nil), }, wurls: []string{"http://192.0.2.1"}, }, @@ -245,7 +239,7 @@ func TestClusterPeerURLs(t *testing.T) { // single peer with a single address with a port { mems: []Member{ - {ID: 1, PeerURLs: []string{"http://192.0.2.1:8001"}}, + newTestMember(1, []string{"http://192.0.2.1:8001"}, "", nil), }, wurls: []string{"http://192.0.2.1:8001"}, }, @@ -253,9 +247,9 @@ func TestClusterPeerURLs(t *testing.T) { // several members explicitly unsorted { mems: []Member{ - {ID: 2, PeerURLs: []string{"http://192.0.2.3", "http://192.0.2.4"}}, - {ID: 3, PeerURLs: []string{"http://192.0.2.5", "http://192.0.2.6"}}, - {ID: 1, PeerURLs: []string{"http://192.0.2.1", "http://192.0.2.2"}}, + newTestMember(2, []string{"http://192.0.2.3", "http://192.0.2.4"}, "", nil), + newTestMember(3, []string{"http://192.0.2.5", "http://192.0.2.6"}, "", nil), + newTestMember(1, []string{"http://192.0.2.1", "http://192.0.2.2"}, "", nil), }, wurls: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"}, }, @@ -269,7 +263,7 @@ func TestClusterPeerURLs(t *testing.T) { // peer with no peer urls { mems: []Member{ - {ID: 3, PeerURLs: []string{}}, + newTestMember(3, []string{}, "", nil), }, wurls: []string{}, }, @@ -296,7 +290,7 @@ func TestClusterClientURLs(t *testing.T) { // single peer with a single address { mems: []Member{ - {ID: 1, ClientURLs: []string{"http://192.0.2.1"}}, + newTestMember(1, nil, "", []string{"http://192.0.2.1"}), }, wurls: []string{"http://192.0.2.1"}, }, @@ -304,7 +298,7 @@ func TestClusterClientURLs(t *testing.T) { // single peer with a single address with a port { mems: []Member{ - {ID: 1, ClientURLs: []string{"http://192.0.2.1:8001"}}, + newTestMember(1, nil, "", []string{"http://192.0.2.1:8001"}), }, wurls: []string{"http://192.0.2.1:8001"}, }, @@ -312,9 +306,9 @@ func TestClusterClientURLs(t *testing.T) { // several members explicitly unsorted { mems: []Member{ - {ID: 2, ClientURLs: []string{"http://192.0.2.3", "http://192.0.2.4"}}, - {ID: 3, ClientURLs: []string{"http://192.0.2.5", "http://192.0.2.6"}}, - {ID: 1, ClientURLs: []string{"http://192.0.2.1", "http://192.0.2.2"}}, + newTestMember(2, nil, "", []string{"http://192.0.2.3", "http://192.0.2.4"}), + newTestMember(3, nil, "", []string{"http://192.0.2.5", "http://192.0.2.6"}), + newTestMember(1, nil, "", []string{"http://192.0.2.1", "http://192.0.2.2"}), }, wurls: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"}, }, @@ -328,7 +322,7 @@ func TestClusterClientURLs(t *testing.T) { // peer with no client urls { mems: []Member{ - {ID: 3, ClientURLs: []string{}}, + newTestMember(3, nil, "", []string{}), }, wurls: []string{}, }, diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 03019c2b0..0233425be 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -614,9 +614,9 @@ func TestV2MachinesEndpoint(t *testing.T) { func TestServeMachines(t *testing.T) { cluster := &fakeCluster{ members: []etcdserver.Member{ - {ID: 0xBEEF0, ClientURLs: []string{"http://localhost:8080"}}, - {ID: 0xBEEF1, ClientURLs: []string{"http://localhost:8081"}}, - {ID: 0xBEEF2, ClientURLs: []string{"http://localhost:8082"}}, + {ID: 0xBEEF0, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}}, + {ID: 0xBEEF1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}}, + {ID: 0xBEEF2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8082"}}}, }, } diff --git a/etcdserver/member.go b/etcdserver/member.go index afe4edab2..510702cf0 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -4,6 +4,7 @@ import ( "crypto/sha1" "encoding/binary" "fmt" + "log" "path" "strconv" "time" @@ -13,18 +14,31 @@ import ( const machineKVPrefix = "/_etcd/machines/" -type Member struct { - ID uint64 - Name string +// RaftAttributes represents the raft related attributes of an etcd member. +type RaftAttributes struct { // TODO(philips): ensure these are URLs - PeerURLs []string + PeerURLs []string +} + +// Attributes represents all the non-raft related attributes of an etcd member. +type Attributes struct { + Name string ClientURLs []string } +type Member struct { + ID uint64 + RaftAttributes + Attributes +} + // newMember creates a Member without an ID and generates one based on the // name, peer URLs. This is used for bootstrapping. func newMember(name string, peerURLs types.URLs, now *time.Time) *Member { - m := &Member{Name: name, PeerURLs: peerURLs.StringSlice()} + m := &Member{ + RaftAttributes: RaftAttributes{PeerURLs: peerURLs.StringSlice()}, + Attributes: Attributes{Name: name}, + } b := []byte(m.Name) for _, p := range m.PeerURLs { @@ -43,3 +57,11 @@ func newMember(name string, peerURLs types.URLs, now *time.Time) *Member { func (m Member) storeKey() string { return path.Join(machineKVPrefix, strconv.FormatUint(m.ID, 16)) } + +func parseMemberID(key string) uint64 { + id, err := strconv.ParseUint(path.Base(key), 16, 64) + if err != nil { + log.Panicf("unexpected parse member id error: %v", err) + } + return id +} diff --git a/etcdserver/server.go b/etcdserver/server.go index 73c9e30ee..2fc1aaea7 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -12,7 +12,6 @@ import ( "github.com/coreos/etcd/discovery" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" @@ -154,15 +153,15 @@ func NewServer(cfg *ServerConfig) *EtcdServer { cls := NewClusterStore(st, *cfg.Cluster) s := &EtcdServer{ - store: st, - node: n, - name: cfg.Name, + store: st, + node: n, + id: m.ID, + attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, storage: struct { *wal.WAL *snap.Snapshotter }{w, ss}, send: Sender(cfg.Transport, cls), - clientURLs: cfg.ClientURLs, ticker: time.Tick(100 * time.Millisecond), syncTicker: time.Tick(500 * time.Millisecond), snapCount: cfg.SnapCount, @@ -175,8 +174,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer { type EtcdServer struct { w wait.Wait done chan struct{} - name string - clientURLs types.URLs + id uint64 + attributes Attributes ClusterStore ClusterStore @@ -444,11 +443,8 @@ func (s *EtcdServer) sync(timeout time.Duration) { // static clientURLs of the server. // The function keeps attempting to register until it succeeds, // or its server is stopped. -// TODO: take care of info fetched from cluster store after having reconfig. func (s *EtcdServer) publish(retryInterval time.Duration) { - m := *s.ClusterStore.Get().FindName(s.name) - m.ClientURLs = s.clientURLs.StringSlice() - b, err := json.Marshal(m) + b, err := json.Marshal(s.attributes) if err != nil { log.Printf("etcdserver: json marshal error: %v", err) return @@ -456,7 +452,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) { req := pb.Request{ ID: int64(GenID()), Method: "PUT", - Path: m.storeKey(), + Path: Member{ID: s.id}.storeKey() + attributesSuffix, Val: string(b), } @@ -466,7 +462,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) { cancel() switch err { case nil: - log.Printf("etcdserver: published %+v to the cluster", m) + log.Printf("etcdserver: published %+v to the cluster", s.attributes) return case ErrStopped: log.Printf("etcdserver: aborting publish because server is stopped") diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index a8146af6c..b34d97414 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "math/rand" - "net/url" "reflect" "sync" "testing" @@ -798,7 +797,7 @@ func TestAddMember(t *testing.T) { ClusterStore: cs, } s.start() - m := Member{ID: 1, PeerURLs: []string{"foo"}} + m := Member{ID: 1, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}} s.AddMember(context.TODO(), m) gaction := n.Action() s.Stop() @@ -864,17 +863,15 @@ func TestServerStopItself(t *testing.T) { func TestPublish(t *testing.T) { n := &nodeProposeDataRecorder{} - cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}}) ch := make(chan interface{}, 1) // simulate that request has gone through consensus ch <- Response{} w := &waitWithResponse{ch: ch} srv := &EtcdServer{ - name: "node1", - clientURLs: []url.URL{{Scheme: "http", Host: "a"}, {Scheme: "http", Host: "b"}}, - node: n, - ClusterStore: cs, - w: w, + id: 1, + attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, + node: n, + w: w, } srv.publish(time.Hour) @@ -889,28 +886,25 @@ func TestPublish(t *testing.T) { if r.Method != "PUT" { t.Errorf("method = %s, want PUT", r.Method) } - wm := Member{ID: 1, Name: "node1", ClientURLs: []string{"http://a", "http://b"}} - if r.Path != wm.storeKey() { - t.Errorf("path = %s, want %s", r.Path, wm.storeKey()) + wm := Member{ID: 1, Attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}} + if r.Path != wm.storeKey()+attributesSuffix { + t.Errorf("path = %s, want %s", r.Path, wm.storeKey()+attributesSuffix) } - var gm Member - if err := json.Unmarshal([]byte(r.Val), &gm); err != nil { + var gattr Attributes + if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil { t.Fatalf("unmarshal val error: %v", err) } - if !reflect.DeepEqual(gm, wm) { - t.Errorf("member = %v, want %v", gm, wm) + if !reflect.DeepEqual(gattr, wm.Attributes) { + t.Errorf("member = %v, want %v", gattr, wm.Attributes) } } // TestPublishStopped tests that publish will be stopped if server is stopped. func TestPublishStopped(t *testing.T) { - cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}}) srv := &EtcdServer{ - name: "node1", - node: &nodeRecorder{}, - ClusterStore: cs, - w: &waitRecorder{}, - done: make(chan struct{}), + node: &nodeRecorder{}, + w: &waitRecorder{}, + done: make(chan struct{}), } srv.Stop() srv.publish(time.Hour) @@ -919,21 +913,18 @@ func TestPublishStopped(t *testing.T) { // TestPublishRetry tests that publish will keep retry until success. func TestPublishRetry(t *testing.T) { n := &nodeRecorder{} - cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}}) srv := &EtcdServer{ - name: "node1", - node: n, - ClusterStore: cs, - w: &waitRecorder{}, - done: make(chan struct{}), + node: n, + w: &waitRecorder{}, + done: make(chan struct{}), } time.AfterFunc(500*time.Microsecond, srv.Stop) srv.publish(10 * time.Nanosecond) action := n.Action() - // multiple Propose + Stop - if len(action) < 3 { - t.Errorf("len(action) = %d, want >= 3", action) + // multiple Proposes + if len(action) < 2 { + t.Errorf("len(action) = %d, want >= 2", action) } } @@ -1258,11 +1249,3 @@ func (cs *clusterStoreRecorder) Get() Cluster { func (cs *clusterStoreRecorder) Remove(id uint64) { cs.record(action{name: "Remove", params: []interface{}{id}}) } - -func mustClusterStore(t *testing.T, membs []Member) ClusterStore { - c := Cluster{} - if err := c.AddSlice(membs); err != nil { - t.Fatalf("error creating cluster from %v: %v", membs, err) - } - return NewClusterStore(&getAllStore{}, c) -}