Merge pull request #1234 from unihorn/152

etcdserver: save PeerURLs and Others into two different keys
This commit is contained in:
Yicheng Qin 2014-10-10 12:21:32 -07:00
commit f16a272898
7 changed files with 253 additions and 148 deletions

View File

@ -7,12 +7,16 @@ import (
"log"
"net/http"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store"
)
const (
raftPrefix = "/raft"
raftAttributesSuffix = "/raftAttributes"
attributesSuffix = "/attributes"
)
type ClusterStore interface {
@ -36,13 +40,20 @@ func NewClusterStore(st store.Store, c Cluster) ClusterStore {
// Add puts a new Member into the store.
// A Member with a matching id must not exist.
func (s *clusterStore) Add(m Member) {
b, err := json.Marshal(m)
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
log.Panicf("marshal peer info error: %v", err)
log.Panicf("marshal error: %v", err)
}
if _, err := s.Store.Create(m.storeKey()+raftAttributesSuffix, false, string(b), false, store.Permanent); err != nil {
log.Panicf("add raftAttributes should never fail: %v", err)
}
if _, err := s.Store.Create(m.storeKey(), false, string(b), false, store.Permanent); err != nil {
log.Panicf("add member should never fail: %v", err)
b, err = json.Marshal(m.Attributes)
if err != nil {
log.Panicf("marshal error: %v", err)
}
if _, err := s.Store.Create(m.storeKey()+attributesSuffix, false, string(b), false, store.Permanent); err != nil {
log.Panicf("add attributes should never fail: %v", err)
}
}
@ -50,28 +61,52 @@ func (s *clusterStore) Add(m Member) {
// lock here.
func (s *clusterStore) Get() Cluster {
c := &Cluster{}
e, err := s.Store.Get(machineKVPrefix, true, false)
e, err := s.Store.Get(machineKVPrefix, true, true)
if err != nil {
if v, ok := err.(*etcdErr.Error); ok && v.ErrorCode == etcdErr.EcodeKeyNotFound {
return *c
}
log.Panicf("get member should never fail: %v", err)
}
for _, n := range e.Node.Nodes {
m := Member{}
if err := json.Unmarshal([]byte(*n.Value), &m); err != nil {
log.Panicf("unmarshal peer error: %v", err)
}
err := c.Add(m)
m, err := nodeToMember(n)
if err != nil {
log.Panicf("unexpected nodeToMember error: %v", err)
}
if err := c.Add(m); err != nil {
log.Panicf("add member to cluster should never fail: %v", err)
}
}
return *c
}
// nodeToMember builds member through a store node.
// the child nodes of the given node should be sorted by key.
func nodeToMember(n *store.NodeExtern) (Member, error) {
m := Member{ID: parseMemberID(n.Key)}
if len(n.Nodes) != 2 {
return m, fmt.Errorf("len(nodes) = %d, want 2", len(n.Nodes))
}
if w := n.Key + attributesSuffix; n.Nodes[0].Key != w {
return m, fmt.Errorf("key = %v, want %v", n.Nodes[0].Key, w)
}
if err := json.Unmarshal([]byte(*n.Nodes[0].Value), &m.Attributes); err != nil {
return m, fmt.Errorf("unmarshal attributes error: %v", err)
}
if w := n.Key + raftAttributesSuffix; n.Nodes[1].Key != w {
return m, fmt.Errorf("key = %v, want %v", n.Nodes[1].Key, w)
}
if err := json.Unmarshal([]byte(*n.Nodes[1].Value), &m.RaftAttributes); err != nil {
return m, fmt.Errorf("unmarshal raftAttributes error: %v", err)
}
return m, nil
}
// Remove removes a member from the store.
// The given id MUST exist.
func (s *clusterStore) Remove(id uint64) {
p := s.Get().FindID(id).storeKey()
if _, err := s.Store.Delete(p, false, false); err != nil {
if _, err := s.Store.Delete(p, true, true); err != nil {
log.Panicf("delete peer should never fail: %v", err)
}
}

View File

@ -12,22 +12,32 @@ import (
func TestClusterStoreAdd(t *testing.T) {
st := &storeRecorder{}
ps := &clusterStore{Store: st}
ps.Add(Member{Name: "node", ID: 1})
ps.Add(newTestMember(1, nil, "node1", nil))
wactions := []action{
{
name: "Create",
params: []interface{}{
machineKVPrefix + "1",
machineKVPrefix + "1/raftAttributes",
false,
`{"ID":1,"Name":"node","PeerURLs":null,"ClientURLs":null}`,
`{"PeerURLs":null}`,
false,
store.Permanent,
},
},
{
name: "Create",
params: []interface{}{
machineKVPrefix + "1/attributes",
false,
`{"Name":"node1","ClientURLs":null}`,
false,
store.Permanent,
},
},
}
if g := st.Action(); !reflect.DeepEqual(g, wactions) {
t.Error("actions = %v, want %v", g, wactions)
t.Errorf("actions = %v, want %v", g, wactions)
}
}
@ -37,20 +47,32 @@ func TestClusterStoreGet(t *testing.T) {
wmems []Member
}{
{
[]Member{{Name: "node1", ID: 1}},
[]Member{{Name: "node1", ID: 1}},
[]Member{newTestMember(1, nil, "node1", nil)},
[]Member{newTestMember(1, nil, "node1", nil)},
},
{
[]Member{},
[]Member{},
},
{
[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
[]Member{
newTestMember(1, nil, "node1", nil),
newTestMember(2, nil, "node2", nil),
},
[]Member{
newTestMember(1, nil, "node1", nil),
newTestMember(2, nil, "node2", nil),
},
},
{
[]Member{{Name: "node2", ID: 2}, {Name: "node1", ID: 1}},
[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
[]Member{
newTestMember(2, nil, "node2", nil),
newTestMember(1, nil, "node1", nil),
},
[]Member{
newTestMember(1, nil, "node1", nil),
newTestMember(2, nil, "node2", nil),
},
},
}
for i, tt := range tests {
@ -60,7 +82,7 @@ func TestClusterStoreGet(t *testing.T) {
t.Error(err)
}
cs := NewClusterStore(&getAllStore{}, c)
cs := NewClusterStore(newGetAllStore(), c)
if g := cs.Get(); !reflect.DeepEqual(g, c) {
t.Errorf("#%d: mems = %v, want %v", i, g, c)
@ -69,9 +91,9 @@ func TestClusterStoreGet(t *testing.T) {
}
func TestClusterStoreDelete(t *testing.T) {
st := &storeGetAllDeleteRecorder{}
st := newStoreGetAllAndDeleteRecorder()
c := Cluster{}
c.Add(Member{Name: "node", ID: 1})
c.Add(newTestMember(1, nil, "node1", nil))
cs := NewClusterStore(st, c)
cs.Remove(1)
@ -81,6 +103,53 @@ func TestClusterStoreDelete(t *testing.T) {
}
}
func TestNodeToMemberFail(t *testing.T) {
tests := []*store.NodeExtern{
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/strange"},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp("garbage")},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
{Key: "/1234/strange"},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
{Key: "/1234/static", Value: stringp("garbage")},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
{Key: "/1234/static", Value: stringp(`{"Name":"node1","ClientURLs":null}`)},
{Key: "/1234/strange"},
}},
}
for i, tt := range tests {
if _, err := nodeToMember(tt); err == nil {
t.Errorf("#%d: unexpected nil error", i)
}
}
}
func TestNodeToMember(t *testing.T) {
n := &store.NodeExtern{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/attributes", Value: stringp(`{"Name":"node1","ClientURLs":null}`)},
{Key: "/1234/raftAttributes", Value: stringp(`{"PeerURLs":null}`)},
}}
wm := Member{ID: 0x1234, RaftAttributes: RaftAttributes{}, Attributes: Attributes{Name: "node1"}}
m, err := nodeToMember(n)
if err != nil {
t.Fatalf("unexpected nodeToMember error: %v", err)
}
if !reflect.DeepEqual(m, wm) {
t.Errorf("member = %+v, want %+v", m, wm)
}
}
// simpleStore implements basic create and get.
type simpleStore struct {
storeRecorder
@ -103,35 +172,41 @@ func (s *simpleStore) Get(key string, _, _ bool) (*store.Event, error) {
return ev, nil
}
// getAllStore inherits simpleStore, and makes Get return all keys.
// getAllStore embeds simpleStore, and makes Get return all keys sorted.
// It uses real store because it uses lots of logic in store and is not easy
// to mock.
// TODO: use mock one to do testing
type getAllStore struct {
simpleStore
store.Store
}
func (s *getAllStore) Get(_ string, _, _ bool) (*store.Event, error) {
nodes := make([]*store.NodeExtern, 0)
for k, v := range s.st {
nodes = append(nodes, &store.NodeExtern{Key: k, Value: stringp(v)})
func newGetAllStore() *getAllStore {
return &getAllStore{store.New()}
}
type storeGetAllAndDeleteRecorder struct {
*getAllStore
deletes []string
}
func newStoreGetAllAndDeleteRecorder() *storeGetAllAndDeleteRecorder {
return &storeGetAllAndDeleteRecorder{getAllStore: newGetAllStore()}
}
func (s *storeGetAllAndDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) {
s.deletes = append(s.deletes, key)
return nil, nil
}
func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member {
return Member{
ID: id,
RaftAttributes: RaftAttributes{PeerURLs: peerURLs},
Attributes: Attributes{Name: name, ClientURLs: clientURLs},
}
return &store.Event{Node: &store.NodeExtern{Nodes: nodes}}, nil
}
type storeDeleteRecorder struct {
storeRecorder
deletes []string
}
func (s *storeDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) {
s.deletes = append(s.deletes, key)
return nil, nil
}
type storeGetAllDeleteRecorder struct {
getAllStore
deletes []string
}
func (s *storeGetAllDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) {
s.deletes = append(s.deletes, key)
return nil, nil
func newTestMemberp(id uint64, peerURLs []string, name string, clientURLs []string) *Member {
m := newTestMember(id, peerURLs, name, clientURLs)
return &m
}

View File

@ -18,19 +18,13 @@ func TestClusterAddSlice(t *testing.T) {
},
{
[]Member{
{ID: 1, PeerURLs: []string{"foo", "bar"}},
{ID: 2, PeerURLs: []string{"baz"}},
newTestMember(1, []string{"foo", "bar"}, "", nil),
newTestMember(2, []string{"baz"}, "", nil),
},
&Cluster{
1: &Member{
ID: 1,
PeerURLs: []string{"foo", "bar"},
},
2: &Member{
ID: 2,
PeerURLs: []string{"baz"},
},
1: newTestMemberp(1, []string{"foo", "bar"}, "", nil),
2: newTestMemberp(2, []string{"baz"}, "", nil),
},
},
}
@ -48,18 +42,18 @@ func TestClusterAddSlice(t *testing.T) {
func TestClusterAddSliceBad(t *testing.T) {
c := Cluster{
1: &Member{ID: 1},
1: newTestMemberp(1, nil, "", nil),
}
if err := c.AddSlice([]Member{{ID: 1}}); err == nil {
if err := c.AddSlice([]Member{newTestMember(1, nil, "", nil)}); err == nil {
t.Error("want err, but got nil")
}
}
func TestClusterPick(t *testing.T) {
cs := Cluster{
1: &Member{ID: 1, PeerURLs: []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}},
2: &Member{ID: 2, PeerURLs: []string{"xyz"}},
3: &Member{ID: 3, PeerURLs: []string{}},
1: newTestMemberp(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil),
2: newTestMemberp(2, []string{"xyz"}, "", nil),
3: newTestMemberp(3, []string{}, "", nil),
}
ids := map[string]bool{
"abc": true,
@ -98,7 +92,7 @@ func TestClusterFind(t *testing.T) {
{
1,
"node1",
[]Member{{Name: "node1", ID: 1}},
[]Member{newTestMember(1, nil, "node1", nil)},
true,
},
{
@ -110,13 +104,13 @@ func TestClusterFind(t *testing.T) {
{
2,
"node2",
[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
[]Member{newTestMember(1, nil, "node1", nil), newTestMember(2, nil, "node2", nil)},
true,
},
{
3,
"node3",
[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
[]Member{newTestMember(1, nil, "node1", nil), newTestMember(2, nil, "node2", nil)},
false,
},
}
@ -161,9 +155,9 @@ func TestClusterSet(t *testing.T) {
{
"mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379",
[]Member{
{ID: 3736794188555456841, Name: "mem1", PeerURLs: []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}},
{ID: 5674507346857578431, Name: "mem2", PeerURLs: []string{"http://10.0.0.2:2379"}},
{ID: 2676999861503984872, Name: "default", PeerURLs: []string{"http://127.0.0.1:2379"}},
newTestMember(3736794188555456841, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil),
newTestMember(5674507346857578431, []string{"http://10.0.0.2:2379"}, "mem2", nil),
newTestMember(2676999861503984872, []string{"http://127.0.0.1:2379"}, "default", nil),
},
},
}
@ -203,9 +197,9 @@ func TestClusterSetBad(t *testing.T) {
func TestClusterIDs(t *testing.T) {
cs := Cluster{}
cs.AddSlice([]Member{
{ID: 1},
{ID: 4},
{ID: 100},
newTestMember(1, nil, "", nil),
newTestMember(4, nil, "", nil),
newTestMember(100, nil, "", nil),
})
w := []uint64{1, 4, 100}
g := cs.IDs()
@ -217,14 +211,14 @@ func TestClusterIDs(t *testing.T) {
func TestClusterAddBad(t *testing.T) {
// Should not be possible to add the same ID multiple times
mems := []Member{
{ID: 1, Name: "mem1"},
{ID: 1, Name: "mem2"},
newTestMember(1, nil, "mem1", nil),
newTestMember(1, nil, "mem2", nil),
}
c := &Cluster{}
c.Add(Member{ID: 1, Name: "mem1"})
c.Add(newTestMember(1, nil, "mem1", nil))
for i, m := range mems {
if err := c.Add(m); err == nil {
t.Errorf("#%d: set = %v, want err", i, m)
t.Errorf("#%d: set = %v, want err", i, err)
}
}
}
@ -237,7 +231,7 @@ func TestClusterPeerURLs(t *testing.T) {
// single peer with a single address
{
mems: []Member{
{ID: 1, PeerURLs: []string{"http://192.0.2.1"}},
newTestMember(1, []string{"http://192.0.2.1"}, "", nil),
},
wurls: []string{"http://192.0.2.1"},
},
@ -245,7 +239,7 @@ func TestClusterPeerURLs(t *testing.T) {
// single peer with a single address with a port
{
mems: []Member{
{ID: 1, PeerURLs: []string{"http://192.0.2.1:8001"}},
newTestMember(1, []string{"http://192.0.2.1:8001"}, "", nil),
},
wurls: []string{"http://192.0.2.1:8001"},
},
@ -253,9 +247,9 @@ func TestClusterPeerURLs(t *testing.T) {
// several members explicitly unsorted
{
mems: []Member{
{ID: 2, PeerURLs: []string{"http://192.0.2.3", "http://192.0.2.4"}},
{ID: 3, PeerURLs: []string{"http://192.0.2.5", "http://192.0.2.6"}},
{ID: 1, PeerURLs: []string{"http://192.0.2.1", "http://192.0.2.2"}},
newTestMember(2, []string{"http://192.0.2.3", "http://192.0.2.4"}, "", nil),
newTestMember(3, []string{"http://192.0.2.5", "http://192.0.2.6"}, "", nil),
newTestMember(1, []string{"http://192.0.2.1", "http://192.0.2.2"}, "", nil),
},
wurls: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"},
},
@ -269,7 +263,7 @@ func TestClusterPeerURLs(t *testing.T) {
// peer with no peer urls
{
mems: []Member{
{ID: 3, PeerURLs: []string{}},
newTestMember(3, []string{}, "", nil),
},
wurls: []string{},
},
@ -296,7 +290,7 @@ func TestClusterClientURLs(t *testing.T) {
// single peer with a single address
{
mems: []Member{
{ID: 1, ClientURLs: []string{"http://192.0.2.1"}},
newTestMember(1, nil, "", []string{"http://192.0.2.1"}),
},
wurls: []string{"http://192.0.2.1"},
},
@ -304,7 +298,7 @@ func TestClusterClientURLs(t *testing.T) {
// single peer with a single address with a port
{
mems: []Member{
{ID: 1, ClientURLs: []string{"http://192.0.2.1:8001"}},
newTestMember(1, nil, "", []string{"http://192.0.2.1:8001"}),
},
wurls: []string{"http://192.0.2.1:8001"},
},
@ -312,9 +306,9 @@ func TestClusterClientURLs(t *testing.T) {
// several members explicitly unsorted
{
mems: []Member{
{ID: 2, ClientURLs: []string{"http://192.0.2.3", "http://192.0.2.4"}},
{ID: 3, ClientURLs: []string{"http://192.0.2.5", "http://192.0.2.6"}},
{ID: 1, ClientURLs: []string{"http://192.0.2.1", "http://192.0.2.2"}},
newTestMember(2, nil, "", []string{"http://192.0.2.3", "http://192.0.2.4"}),
newTestMember(3, nil, "", []string{"http://192.0.2.5", "http://192.0.2.6"}),
newTestMember(1, nil, "", []string{"http://192.0.2.1", "http://192.0.2.2"}),
},
wurls: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"},
},
@ -328,7 +322,7 @@ func TestClusterClientURLs(t *testing.T) {
// peer with no client urls
{
mems: []Member{
{ID: 3, ClientURLs: []string{}},
newTestMember(3, nil, "", []string{}),
},
wurls: []string{},
},

View File

@ -614,9 +614,9 @@ func TestV2MachinesEndpoint(t *testing.T) {
func TestServeMachines(t *testing.T) {
cluster := &fakeCluster{
members: []etcdserver.Member{
{ID: 0xBEEF0, ClientURLs: []string{"http://localhost:8080"}},
{ID: 0xBEEF1, ClientURLs: []string{"http://localhost:8081"}},
{ID: 0xBEEF2, ClientURLs: []string{"http://localhost:8082"}},
{ID: 0xBEEF0, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}},
{ID: 0xBEEF1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}},
{ID: 0xBEEF2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8082"}}},
},
}

View File

@ -4,6 +4,7 @@ import (
"crypto/sha1"
"encoding/binary"
"fmt"
"log"
"path"
"strconv"
"time"
@ -13,18 +14,31 @@ import (
const machineKVPrefix = "/_etcd/machines/"
type Member struct {
ID uint64
Name string
// RaftAttributes represents the raft related attributes of an etcd member.
type RaftAttributes struct {
// TODO(philips): ensure these are URLs
PeerURLs []string
PeerURLs []string
}
// Attributes represents all the non-raft related attributes of an etcd member.
type Attributes struct {
Name string
ClientURLs []string
}
type Member struct {
ID uint64
RaftAttributes
Attributes
}
// newMember creates a Member without an ID and generates one based on the
// name, peer URLs. This is used for bootstrapping.
func newMember(name string, peerURLs types.URLs, now *time.Time) *Member {
m := &Member{Name: name, PeerURLs: peerURLs.StringSlice()}
m := &Member{
RaftAttributes: RaftAttributes{PeerURLs: peerURLs.StringSlice()},
Attributes: Attributes{Name: name},
}
b := []byte(m.Name)
for _, p := range m.PeerURLs {
@ -43,3 +57,11 @@ func newMember(name string, peerURLs types.URLs, now *time.Time) *Member {
func (m Member) storeKey() string {
return path.Join(machineKVPrefix, strconv.FormatUint(m.ID, 16))
}
func parseMemberID(key string) uint64 {
id, err := strconv.ParseUint(path.Base(key), 16, 64)
if err != nil {
log.Panicf("unexpected parse member id error: %v", err)
}
return id
}

View File

@ -12,7 +12,6 @@ import (
"github.com/coreos/etcd/discovery"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
@ -163,15 +162,15 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
cls := NewClusterStore(st, *cfg.Cluster)
s := &EtcdServer{
store: st,
node: n,
name: cfg.Name,
store: st,
node: n,
id: m.ID,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
storage: struct {
*wal.WAL
*snap.Snapshotter
}{w, ss},
send: Sender(cfg.Transport, cls),
clientURLs: cfg.ClientURLs,
ticker: time.Tick(100 * time.Millisecond),
syncTicker: time.Tick(500 * time.Millisecond),
snapCount: cfg.SnapCount,
@ -184,8 +183,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
type EtcdServer struct {
w wait.Wait
done chan struct{}
name string
clientURLs types.URLs
id uint64
attributes Attributes
ClusterStore ClusterStore
@ -453,11 +452,8 @@ func (s *EtcdServer) sync(timeout time.Duration) {
// static clientURLs of the server.
// The function keeps attempting to register until it succeeds,
// or its server is stopped.
// TODO: take care of info fetched from cluster store after having reconfig.
func (s *EtcdServer) publish(retryInterval time.Duration) {
m := *s.ClusterStore.Get().FindName(s.name)
m.ClientURLs = s.clientURLs.StringSlice()
b, err := json.Marshal(m)
b, err := json.Marshal(s.attributes)
if err != nil {
log.Printf("etcdserver: json marshal error: %v", err)
return
@ -465,7 +461,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
req := pb.Request{
ID: int64(GenID()),
Method: "PUT",
Path: m.storeKey(),
Path: Member{ID: s.id}.storeKey() + attributesSuffix,
Val: string(b),
}
@ -475,7 +471,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
cancel()
switch err {
case nil:
log.Printf("etcdserver: published %+v to the cluster", m)
log.Printf("etcdserver: published %+v to the cluster", s.attributes)
return
case ErrStopped:
log.Printf("etcdserver: aborting publish because server is stopped")

View File

@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"math/rand"
"net/url"
"reflect"
"sync"
"testing"
@ -798,7 +797,7 @@ func TestAddMember(t *testing.T) {
ClusterStore: cs,
}
s.start()
m := Member{ID: 1, PeerURLs: []string{"foo"}}
m := Member{ID: 1, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
s.AddMember(context.TODO(), m)
gaction := n.Action()
s.Stop()
@ -864,17 +863,15 @@ func TestServerStopItself(t *testing.T) {
func TestPublish(t *testing.T) {
n := &nodeProposeDataRecorder{}
cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
ch <- Response{}
w := &waitWithResponse{ch: ch}
srv := &EtcdServer{
name: "node1",
clientURLs: []url.URL{{Scheme: "http", Host: "a"}, {Scheme: "http", Host: "b"}},
node: n,
ClusterStore: cs,
w: w,
id: 1,
attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
node: n,
w: w,
}
srv.publish(time.Hour)
@ -889,28 +886,25 @@ func TestPublish(t *testing.T) {
if r.Method != "PUT" {
t.Errorf("method = %s, want PUT", r.Method)
}
wm := Member{ID: 1, Name: "node1", ClientURLs: []string{"http://a", "http://b"}}
if r.Path != wm.storeKey() {
t.Errorf("path = %s, want %s", r.Path, wm.storeKey())
wm := Member{ID: 1, Attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
if r.Path != wm.storeKey()+attributesSuffix {
t.Errorf("path = %s, want %s", r.Path, wm.storeKey()+attributesSuffix)
}
var gm Member
if err := json.Unmarshal([]byte(r.Val), &gm); err != nil {
var gattr Attributes
if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
t.Fatalf("unmarshal val error: %v", err)
}
if !reflect.DeepEqual(gm, wm) {
t.Errorf("member = %v, want %v", gm, wm)
if !reflect.DeepEqual(gattr, wm.Attributes) {
t.Errorf("member = %v, want %v", gattr, wm.Attributes)
}
}
// TestPublishStopped tests that publish will be stopped if server is stopped.
func TestPublishStopped(t *testing.T) {
cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
srv := &EtcdServer{
name: "node1",
node: &nodeRecorder{},
ClusterStore: cs,
w: &waitRecorder{},
done: make(chan struct{}),
node: &nodeRecorder{},
w: &waitRecorder{},
done: make(chan struct{}),
}
srv.Stop()
srv.publish(time.Hour)
@ -919,21 +913,18 @@ func TestPublishStopped(t *testing.T) {
// TestPublishRetry tests that publish will keep retry until success.
func TestPublishRetry(t *testing.T) {
n := &nodeRecorder{}
cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
srv := &EtcdServer{
name: "node1",
node: n,
ClusterStore: cs,
w: &waitRecorder{},
done: make(chan struct{}),
node: n,
w: &waitRecorder{},
done: make(chan struct{}),
}
time.AfterFunc(500*time.Microsecond, srv.Stop)
srv.publish(10 * time.Nanosecond)
action := n.Action()
// multiple Propose + Stop
if len(action) < 3 {
t.Errorf("len(action) = %d, want >= 3", action)
// multiple Proposes
if len(action) < 2 {
t.Errorf("len(action) = %d, want >= 2", action)
}
}
@ -1258,11 +1249,3 @@ func (cs *clusterStoreRecorder) Get() Cluster {
func (cs *clusterStoreRecorder) Remove(id uint64) {
cs.record(action{name: "Remove", params: []interface{}{id}})
}
func mustClusterStore(t *testing.T, membs []Member) ClusterStore {
c := Cluster{}
if err := c.AddSlice(membs); err != nil {
t.Fatalf("error creating cluster from %v: %v", membs, err)
}
return NewClusterStore(&getAllStore{}, c)
}