diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 2019bc1e4..4406be439 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -19,28 +19,129 @@ package etcdserver import ( "crypto/sha1" "encoding/binary" + "encoding/json" "fmt" - "math/rand" + "log" "net/url" "sort" "strings" + etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/pkg/flags" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/store" ) +const ( + raftAttributesSuffix = "/raftAttributes" + attributesSuffix = "/attributes" +) + +type ClusterInfo interface { + ID() uint64 + ClientURLs() []string +} + // Cluster is a list of Members that belong to the same raft cluster type Cluster struct { id uint64 name string members map[uint64]*Member + removed map[uint64]bool + store store.Store } -func NewCluster(clusterName string) *Cluster { - return &Cluster{name: clusterName, members: make(map[uint64]*Member)} +// NewClusterFromString returns Cluster through given clusterName and parsing +// members from a sets of names to IPs discovery formatted like: +// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach0=http://1.1.1.1,mach1=http://2.2.2.2,mach1=http://3.3.3.3 +func NewClusterFromString(name string, cluster string) (*Cluster, error) { + c := newCluster(name) + + v, err := url.ParseQuery(strings.Replace(cluster, ",", "&", -1)) + if err != nil { + return nil, err + } + for name, urls := range v { + if len(urls) == 0 || urls[0] == "" { + return nil, fmt.Errorf("Empty URL given for %q", name) + } + m := NewMember(name, types.URLs(*flags.NewURLsValue(strings.Join(urls, ","))), c.name, nil) + if _, ok := c.members[m.ID]; ok { + return nil, fmt.Errorf("Member exists with identical ID %v", m) + } + c.members[m.ID] = m + } + return c, nil } -func (c Cluster) FindName(name string) *Member { +type MemberInfo struct { + Name string + PeerURLs types.URLs +} + +// NewClusterFromMembers returns Cluster with the given members. +func NewClusterFromMemberInfos(name string, infos []MemberInfo) (*Cluster, error) { + c := newCluster(name) + for _, info := range infos { + m := NewMember(info.Name, info.PeerURLs, c.name, nil) + if _, ok := c.members[m.ID]; ok { + return nil, fmt.Errorf("Member exists with identical ID %v", m) + } + c.members[m.ID] = m + } + return c, nil +} + +func NewClusterFromStore(name string, st store.Store) *Cluster { + c := newCluster(name) + c.store = st + + e, err := c.store.Get(storeMembersPrefix, true, true) + if err != nil { + if isKeyNotFound(err) { + return c + } + log.Panicf("get member should never fail: %v", err) + } + for _, n := range e.Node.Nodes { + m, err := nodeToMember(n) + if err != nil { + log.Panicf("unexpected nodeToMember error: %v", err) + } + c.members[m.ID] = m + } + + e, err = c.store.Get(storeRemovedMembersPrefix, true, true) + if err != nil { + if isKeyNotFound(err) { + return c + } + log.Panicf("get member should never fail: %v", err) + } + for _, n := range e.Node.Nodes { + c.removed[parseMemberID(n.Key)] = true + } + + return c +} + +func newCluster(name string) *Cluster { + return &Cluster{ + name: name, + members: make(map[uint64]*Member), + removed: make(map[uint64]bool), + } +} + +func (c Cluster) ID() uint64 { return c.id } + +func (c Cluster) Members() map[uint64]*Member { return c.members } + +func (c *Cluster) Member(id uint64) *Member { + return c.members[id] +} + +func (c *Cluster) MemberFromName(name string) *Member { for _, m := range c.members { if m.Name == name { return m @@ -49,101 +150,6 @@ func (c Cluster) FindName(name string) *Member { return nil } -func (c Cluster) FindID(id uint64) *Member { - return c.members[id] -} - -func (c Cluster) Add(m Member) error { - if c.FindID(m.ID) != nil { - return fmt.Errorf("Member exists with identical ID %v", m) - } - c.members[m.ID] = &m - return nil -} - -func (c *Cluster) AddSlice(mems []Member) error { - for _, m := range mems { - err := c.Add(m) - if err != nil { - return err - } - } - - return nil -} - -// Pick chooses a random address from a given Member's addresses, and returns it as -// an addressible URI. If the given member does not exist, an empty string is returned. -func (c Cluster) Pick(id uint64) string { - if m := c.FindID(id); m != nil { - urls := m.PeerURLs - if len(urls) == 0 { - return "" - } - return urls[rand.Intn(len(urls))] - } - - return "" -} - -// SetMembersFromString parses a sets of names to IPs either from the command line or discovery formatted like: -// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach0=http://1.1.1.1,mach1=http://2.2.2.2,mach1=http://3.3.3.3 -func (c *Cluster) SetMembersFromString(s string) error { - c.members = make(map[uint64]*Member) - v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1)) - if err != nil { - return err - } - - for name, urls := range v { - if len(urls) == 0 || urls[0] == "" { - return fmt.Errorf("Empty URL given for %q", name) - } - - m := NewMember(name, types.URLs(*flags.NewURLsValue(strings.Join(urls, ","))), c.name, nil) - err := c.Add(*m) - if err != nil { - return err - } - } - return nil -} - -func (c *Cluster) AddMemberFromURLs(name string, urls types.URLs) (*Member, error) { - m := NewMember(name, urls, c.name, nil) - err := c.Add(*m) - if err != nil { - return nil, err - } - return m, nil -} - -func (c *Cluster) GenID(salt []byte) { - mIDs := c.MemberIDs() - b := make([]byte, 8*len(mIDs)) - for i, id := range mIDs { - binary.BigEndian.PutUint64(b[8*i:], id) - } - b = append(b, salt...) - hash := sha1.Sum(b) - c.id = binary.BigEndian.Uint64(hash[:8]) -} - -func (c Cluster) String() string { - sl := []string{} - for _, m := range c.members { - for _, u := range m.PeerURLs { - sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u)) - } - } - sort.Strings(sl) - return strings.Join(sl, ",") -} - -func (c Cluster) ID() uint64 { return c.id } - -func (c Cluster) Members() map[uint64]*Member { return c.members } - func (c Cluster) MemberIDs() []uint64 { var ids []uint64 for _, m := range c.members { @@ -153,6 +159,10 @@ func (c Cluster) MemberIDs() []uint64 { return ids } +func (c *Cluster) IsMemberRemoved(id uint64) bool { + return c.removed[id] +} + // PeerURLs returns a list of all peer addresses. Each address is prefixed // with the scheme (currently "http://"). The returned list is sorted in // ascending lexicographical order. @@ -180,3 +190,93 @@ func (c Cluster) ClientURLs() []string { sort.Strings(urls) return urls } + +func (c Cluster) String() string { + sl := []string{} + for _, m := range c.members { + for _, u := range m.PeerURLs { + sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u)) + } + } + sort.Strings(sl) + return strings.Join(sl, ",") +} + +func (c *Cluster) GenID(salt []byte) { + mIDs := c.MemberIDs() + b := make([]byte, 8*len(mIDs)) + for i, id := range mIDs { + binary.BigEndian.PutUint64(b[8*i:], id) + } + b = append(b, salt...) + hash := sha1.Sum(b) + c.id = binary.BigEndian.Uint64(hash[:8]) +} + +func (c *Cluster) SetID(id uint64) { + c.id = id +} + +func (c *Cluster) SetStore(st store.Store) { + c.store = st +} + +// AddMember puts a new Member into the store. +// A Member with a matching id must not exist. +func (c *Cluster) AddMember(m *Member) { + b, err := json.Marshal(m.RaftAttributes) + if err != nil { + log.Panicf("marshal error: %v", err) + } + if _, err := c.store.Create(memberStoreKey(m.ID)+raftAttributesSuffix, false, string(b), false, store.Permanent); err != nil { + log.Panicf("add raftAttributes should never fail: %v", err) + } + b, err = json.Marshal(m.Attributes) + if err != nil { + log.Panicf("marshal error: %v", err) + } + if _, err := c.store.Create(memberStoreKey(m.ID)+attributesSuffix, false, string(b), false, store.Permanent); err != nil { + log.Panicf("add attributes should never fail: %v", err) + } + c.members[m.ID] = m +} + +// RemoveMember removes a member from the store. +// The given id MUST exist. +func (c *Cluster) RemoveMember(id uint64) { + if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil { + log.Panicf("delete peer should never fail: %v", err) + } + delete(c.members, id) + if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil { + log.Panicf("creating RemovedMember should never fail: %v", err) + } + c.removed[id] = true +} + +// nodeToMember builds member through a store node. +// the child nodes of the given node should be sorted by key. +func nodeToMember(n *store.NodeExtern) (*Member, error) { + m := &Member{ID: parseMemberID(n.Key)} + if len(n.Nodes) != 2 { + return m, fmt.Errorf("len(nodes) = %d, want 2", len(n.Nodes)) + } + if w := n.Key + attributesSuffix; n.Nodes[0].Key != w { + return m, fmt.Errorf("key = %v, want %v", n.Nodes[0].Key, w) + } + if err := json.Unmarshal([]byte(*n.Nodes[0].Value), &m.Attributes); err != nil { + return m, fmt.Errorf("unmarshal attributes error: %v", err) + } + if w := n.Key + raftAttributesSuffix; n.Nodes[1].Key != w { + return m, fmt.Errorf("key = %v, want %v", n.Nodes[1].Key, w) + } + if err := json.Unmarshal([]byte(*n.Nodes[1].Value), &m.RaftAttributes); err != nil { + return m, fmt.Errorf("unmarshal raftAttributes error: %v", err) + } + return m, nil +} + +func isKeyNotFound(err error) bool { + e, ok := err.(*etcdErr.Error) + return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound +} diff --git a/etcdserver/cluster_store.go b/etcdserver/cluster_store.go deleted file mode 100644 index 049fff995..000000000 --- a/etcdserver/cluster_store.go +++ /dev/null @@ -1,240 +0,0 @@ -/* - Copyright 2014 CoreOS, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package etcdserver - -import ( - "bytes" - "encoding/json" - "fmt" - "log" - "net/http" - "strconv" - "time" - - etcdErr "github.com/coreos/etcd/error" - - "github.com/coreos/etcd/etcdserver/stats" - "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/store" -) - -const ( - raftPrefix = "/raft" - - raftAttributesSuffix = "/raftAttributes" - attributesSuffix = "/attributes" -) - -type ClusterStore interface { - Add(m Member) - Get() Cluster - Remove(id uint64) - IsRemoved(id uint64) bool -} - -type clusterStore struct { - Store store.Store - // TODO: write the id into the actual store? - // TODO: save the id as string? - id uint64 - clusterName string -} - -// Add puts a new Member into the store. -// A Member with a matching id must not exist. -func (s *clusterStore) Add(m Member) { - b, err := json.Marshal(m.RaftAttributes) - if err != nil { - log.Panicf("marshal error: %v", err) - } - if _, err := s.Store.Create(memberStoreKey(m.ID)+raftAttributesSuffix, false, string(b), false, store.Permanent); err != nil { - log.Panicf("add raftAttributes should never fail: %v", err) - } - - b, err = json.Marshal(m.Attributes) - if err != nil { - log.Panicf("marshal error: %v", err) - } - if _, err := s.Store.Create(memberStoreKey(m.ID)+attributesSuffix, false, string(b), false, store.Permanent); err != nil { - log.Panicf("add attributes should never fail: %v", err) - } -} - -// TODO(philips): keep the latest copy without going to the store to avoid the -// lock here. -func (s *clusterStore) Get() Cluster { - c := NewCluster(s.clusterName) - c.id = s.id - e, err := s.Store.Get(storeMembersPrefix, true, true) - if err != nil { - if isKeyNotFound(err) { - return *c - } - log.Panicf("get member should never fail: %v", err) - } - for _, n := range e.Node.Nodes { - m, err := nodeToMember(n) - if err != nil { - log.Panicf("unexpected nodeToMember error: %v", err) - } - if err := c.Add(m); err != nil { - log.Panicf("add member to cluster should never fail: %v", err) - } - } - return *c -} - -// nodeToMember builds member through a store node. -// the child nodes of the given node should be sorted by key. -func nodeToMember(n *store.NodeExtern) (Member, error) { - m := Member{ID: parseMemberID(n.Key)} - if len(n.Nodes) != 2 { - return m, fmt.Errorf("len(nodes) = %d, want 2", len(n.Nodes)) - } - if w := n.Key + attributesSuffix; n.Nodes[0].Key != w { - return m, fmt.Errorf("key = %v, want %v", n.Nodes[0].Key, w) - } - if err := json.Unmarshal([]byte(*n.Nodes[0].Value), &m.Attributes); err != nil { - return m, fmt.Errorf("unmarshal attributes error: %v", err) - } - if w := n.Key + raftAttributesSuffix; n.Nodes[1].Key != w { - return m, fmt.Errorf("key = %v, want %v", n.Nodes[1].Key, w) - } - if err := json.Unmarshal([]byte(*n.Nodes[1].Value), &m.RaftAttributes); err != nil { - return m, fmt.Errorf("unmarshal raftAttributes error: %v", err) - } - return m, nil -} - -// Remove removes a member from the store. -// The given id MUST exist. -func (s *clusterStore) Remove(id uint64) { - if _, err := s.Store.Delete(memberStoreKey(id), true, true); err != nil { - log.Panicf("delete peer should never fail: %v", err) - } - if _, err := s.Store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil { - log.Panicf("creating RemovedMember should never fail: %v", err) - } -} - -func (s *clusterStore) IsRemoved(id uint64) bool { - _, err := s.Store.Get(removedMemberStoreKey(id), false, false) - switch { - case err == nil: - return true - case isKeyNotFound(err): - return false - default: - log.Panicf("unexpected error when getting removed member %x: %v", id, err) - return false - } -} - -// Sender creates the default production sender used to transport raft messages -// in the cluster. The returned sender will update the given ServerStats and -// LeaderStats appropriately. -func Sender(t *http.Transport, cls ClusterStore, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) { - c := &http.Client{Transport: t} - - return func(msgs []raftpb.Message) { - for _, m := range msgs { - // TODO: reuse go routines - // limit the number of outgoing connections for the same receiver - go send(c, cls, m, ss, ls) - } - } -} - -// send uses the given client to send a message to a member in the given -// ClusterStore, retrying up to 3 times for each message. The given -// ServerStats and LeaderStats are updated appropriately -func send(c *http.Client, cls ClusterStore, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) { - cid := cls.Get().ID() - // TODO (xiangli): reasonable retry logic - for i := 0; i < 3; i++ { - u := cls.Get().Pick(m.To) - if u == "" { - // TODO: unknown peer id.. what do we do? I - // don't think his should ever happen, need to - // look into this further. - log.Printf("etcdhttp: no addr for %d", m.To) - return - } - u = fmt.Sprintf("%s%s", u, raftPrefix) - - // TODO: don't block. we should be able to have 1000s - // of messages out at a time. - data, err := m.Marshal() - if err != nil { - log.Println("etcdhttp: dropping message:", err) - return // drop bad message - } - if m.Type == raftpb.MsgApp { - ss.SendAppendReq(len(data)) - } - to := idAsHex(m.To) - fs := ls.Follower(to) - - start := time.Now() - sent := httpPost(c, u, cid, data) - end := time.Now() - if sent { - fs.Succ(end.Sub(start)) - return - } - fs.Fail() - // TODO: backoff - } -} - -// httpPost POSTs a data payload to a url using the given client. Returns true -// if the POST succeeds, false on any failure. -func httpPost(c *http.Client, url string, cid uint64, data []byte) bool { - req, err := http.NewRequest("POST", url, bytes.NewBuffer(data)) - if err != nil { - // TODO: log the error? - return false - } - req.Header.Set("Content-Type", "application/protobuf") - req.Header.Set("X-Etcd-Cluster-ID", strconv.FormatUint(cid, 16)) - resp, err := c.Do(req) - if err != nil { - // TODO: log the error? - return false - } - resp.Body.Close() - - switch resp.StatusCode { - case http.StatusPreconditionFailed: - // TODO: shutdown the etcdserver gracefully? - log.Fatalf("etcd: conflicting cluster ID with the target cluster (%s != %s). Exiting.", resp.Header.Get("X-Etcd-Cluster-ID"), strconv.FormatUint(cid, 16)) - return false - case http.StatusForbidden: - // TODO: stop the server - log.Fatalf("etcd: this member has been permanently removed from the cluster. Exiting.") - return false - case http.StatusNoContent: - return true - default: - return false - } -} - -func isKeyNotFound(err error) bool { - e, ok := err.(*etcdErr.Error) - return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound -} diff --git a/etcdserver/cluster_store_test.go b/etcdserver/cluster_store_test.go deleted file mode 100644 index 11b561b6e..000000000 --- a/etcdserver/cluster_store_test.go +++ /dev/null @@ -1,239 +0,0 @@ -/* - Copyright 2014 CoreOS, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package etcdserver - -import ( - "path" - "reflect" - "testing" - "time" - - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/store" -) - -func TestClusterStoreAdd(t *testing.T) { - st := &storeRecorder{} - ps := &clusterStore{Store: st} - ps.Add(newTestMember(1, nil, "node1", nil)) - - wactions := []action{ - { - name: "Create", - params: []interface{}{ - path.Join(storeMembersPrefix, "1", "raftAttributes"), - false, - `{"PeerURLs":null}`, - false, - store.Permanent, - }, - }, - { - name: "Create", - params: []interface{}{ - path.Join(storeMembersPrefix, "1", "attributes"), - false, - `{"Name":"node1"}`, - false, - store.Permanent, - }, - }, - } - if g := st.Action(); !reflect.DeepEqual(g, wactions) { - t.Errorf("actions = %v, want %v", g, wactions) - } -} - -func TestClusterStoreGet(t *testing.T) { - tests := []struct { - mems []Member - wmems []Member - }{ - { - []Member{newTestMember(1, nil, "node1", nil)}, - []Member{newTestMember(1, nil, "node1", nil)}, - }, - { - []Member{}, - []Member{}, - }, - { - []Member{ - newTestMember(1, nil, "node1", nil), - newTestMember(2, nil, "node2", nil), - }, - []Member{ - newTestMember(1, nil, "node1", nil), - newTestMember(2, nil, "node2", nil), - }, - }, - { - []Member{ - newTestMember(2, nil, "node2", nil), - newTestMember(1, nil, "node1", nil), - }, - []Member{ - newTestMember(1, nil, "node1", nil), - newTestMember(2, nil, "node2", nil), - }, - }, - } - for i, tt := range tests { - c := NewCluster("") - if err := c.AddSlice(tt.mems); err != nil { - t.Fatal(err) - } - c.GenID(nil) - cs := &clusterStore{Store: newGetAllStore(), id: c.id} - for _, m := range tt.mems { - cs.Add(m) - } - if g := cs.Get(); !reflect.DeepEqual(&g, c) { - t.Errorf("#%d: mems = %v, want %v", i, &g, c) - } - } -} - -func TestClusterStoreRemove(t *testing.T) { - st := &storeRecorder{} - cs := &clusterStore{Store: st} - cs.Remove(1) - - wactions := []action{ - {name: "Delete", params: []interface{}{memberStoreKey(1), true, true}}, - {name: "Create", params: []interface{}{removedMemberStoreKey(1), false, "", false, store.Permanent}}, - } - if !reflect.DeepEqual(st.Action(), wactions) { - t.Errorf("actions = %v, want %v", st.Action(), wactions) - } -} - -func TestClusterStoreIsRemovedFalse(t *testing.T) { - st := &errStoreRecorder{err: etcdErr.NewError(etcdErr.EcodeKeyNotFound, "", 0)} - cs := clusterStore{Store: st} - if ok := cs.IsRemoved(1); ok != false { - t.Errorf("IsRemoved = %v, want %v", ok, false) - } -} - -func TestClusterStoreIsRemovedTrue(t *testing.T) { - st := &storeRecorder{} - cs := &clusterStore{Store: st} - if ok := cs.IsRemoved(1); ok != true { - t.Errorf("IsRemoved = %v, want %v", ok, true) - } - wactions := []action{ - {name: "Get", params: []interface{}{removedMemberStoreKey(1), false, false}}, - } - if !reflect.DeepEqual(st.Action(), wactions) { - t.Errorf("actions = %v, want %v", st.Action(), wactions) - } -} - -func TestNodeToMemberFail(t *testing.T) { - tests := []*store.NodeExtern{ - {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/strange"}, - }}, - {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp("garbage")}, - }}, - {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)}, - }}, - {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)}, - {Key: "/1234/strange"}, - }}, - {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)}, - {Key: "/1234/static", Value: stringp("garbage")}, - }}, - {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)}, - {Key: "/1234/static", Value: stringp(`{"Name":"node1","ClientURLs":null}`)}, - {Key: "/1234/strange"}, - }}, - } - for i, tt := range tests { - if _, err := nodeToMember(tt); err == nil { - t.Errorf("#%d: unexpected nil error", i) - } - } -} - -func TestNodeToMember(t *testing.T) { - n := &store.NodeExtern{Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/attributes", Value: stringp(`{"Name":"node1","ClientURLs":null}`)}, - {Key: "/1234/raftAttributes", Value: stringp(`{"PeerURLs":null}`)}, - }} - wm := Member{ID: 0x1234, RaftAttributes: RaftAttributes{}, Attributes: Attributes{Name: "node1"}} - m, err := nodeToMember(n) - if err != nil { - t.Fatalf("unexpected nodeToMember error: %v", err) - } - if !reflect.DeepEqual(m, wm) { - t.Errorf("member = %+v, want %+v", m, wm) - } -} - -// simpleStore implements basic create and get. -type simpleStore struct { - storeRecorder - st map[string]string -} - -func (s *simpleStore) Create(key string, _ bool, value string, _ bool, _ time.Time) (*store.Event, error) { - if s.st == nil { - s.st = make(map[string]string) - } - s.st[key] = value - return nil, nil -} -func (s *simpleStore) Get(key string, _, _ bool) (*store.Event, error) { - val, ok := s.st[key] - if !ok { - return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, "", 0) - } - ev := &store.Event{Node: &store.NodeExtern{Key: key, Value: stringp(val)}} - return ev, nil -} - -// getAllStore embeds simpleStore, and makes Get return all keys sorted. -// It uses real store because it uses lots of logic in store and is not easy -// to mock. -// TODO: use mock one to do testing -type getAllStore struct { - store.Store -} - -func newGetAllStore() *getAllStore { - return &getAllStore{store.New()} -} - -func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member { - return Member{ - ID: id, - RaftAttributes: RaftAttributes{PeerURLs: peerURLs}, - Attributes: Attributes{Name: name, ClientURLs: clientURLs}, - } -} - -func newTestMemberp(id uint64, peerURLs []string, name string, clientURLs []string) *Member { - m := newTestMember(id, peerURLs, name, clientURLs) - return &m -} diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 0f36da058..e8a0ab118 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -17,156 +17,14 @@ package etcdserver import ( + "path" "reflect" "testing" + + "github.com/coreos/etcd/store" ) -func TestClusterAddSlice(t *testing.T) { - tests := []struct { - mems []Member - want *Cluster - }{ - { - []Member{}, - NewCluster(""), - }, - { - []Member{ - newTestMember(1, []string{"foo", "bar"}, "", nil), - newTestMember(2, []string{"baz"}, "", nil), - }, - &Cluster{ - members: map[uint64]*Member{ - 1: newTestMemberp(1, []string{"foo", "bar"}, "", nil), - 2: newTestMemberp(2, []string{"baz"}, "", nil), - }, - }, - }, - } - for i, tt := range tests { - c := NewCluster("") - if err := c.AddSlice(tt.mems); err != nil { - t.Errorf("#%d: err=%#v, want nil", i, err) - continue - } - if !reflect.DeepEqual(c, tt.want) { - t.Errorf("#%d: c=%#v, want %#v", i, c, tt.want) - } - } -} - -func TestClusterAddSliceBad(t *testing.T) { - c := Cluster{ - members: map[uint64]*Member{ - 1: newTestMemberp(1, nil, "", nil), - }, - } - if err := c.AddSlice([]Member{newTestMember(1, nil, "", nil)}); err == nil { - t.Error("want err, but got nil") - } -} - -func TestClusterPick(t *testing.T) { - cs := Cluster{ - members: map[uint64]*Member{ - 1: newTestMemberp(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil), - 2: newTestMemberp(2, []string{"xyz"}, "", nil), - 3: newTestMemberp(3, []string{}, "", nil), - }, - } - ids := map[string]bool{ - "abc": true, - "def": true, - "ghi": true, - "jkl": true, - "mno": true, - "pqr": true, - "stu": true, - } - for i := 0; i < 1000; i++ { - a := cs.Pick(1) - if !ids[a] { - t.Errorf("returned ID %q not in expected range!", a) - break - } - } - if b := cs.Pick(2); b != "xyz" { - t.Errorf("id=%q, want %q", b, "xyz") - } - if c := cs.Pick(3); c != "" { - t.Errorf("id=%q, want %q", c, "") - } - if d := cs.Pick(4); d != "" { - t.Errorf("id=%q, want %q", d, "") - } -} - -func TestClusterFind(t *testing.T) { - tests := []struct { - id uint64 - name string - mems []Member - match bool - }{ - { - 1, - "node1", - []Member{newTestMember(1, nil, "node1", nil)}, - true, - }, - { - 2, - "foobar", - []Member{}, - false, - }, - { - 2, - "node2", - []Member{newTestMember(1, nil, "node1", nil), newTestMember(2, nil, "node2", nil)}, - true, - }, - { - 3, - "node3", - []Member{newTestMember(1, nil, "node1", nil), newTestMember(2, nil, "node2", nil)}, - false, - }, - } - for i, tt := range tests { - c := NewCluster("") - c.AddSlice(tt.mems) - - m := c.FindID(tt.id) - if m == nil && !tt.match { - continue - } - if m == nil && tt.match { - t.Errorf("#%d: expected match got empty", i) - } - if m.Name != tt.name && tt.match { - t.Errorf("#%d: got = %v, want %v", i, m.Name, tt.name) - } - } - - for i, tt := range tests { - c := NewCluster("") - c.AddSlice(tt.mems) - - m := c.FindID(tt.id) - if m == nil && !tt.match { - continue - } - if m == nil && tt.match { - t.Errorf("#%d: expected match got empty", i) - } - if m.ID != tt.id && tt.match { - t.Errorf("#%d: got = %v, want %v", i, m.Name, tt.id) - } - } -} - -func TestClusterSet(t *testing.T) { +func TestClusterFromString(t *testing.T) { tests := []struct { f string mems []Member @@ -174,54 +32,28 @@ func TestClusterSet(t *testing.T) { { "mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379", []Member{ - newTestMember(3736794188555456841, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil), - newTestMember(5674507346857578431, []string{"http://10.0.0.2:2379"}, "mem2", nil), - newTestMember(2676999861503984872, []string{"http://127.0.0.1:2379"}, "default", nil), + newTestMember(4322322643958477905, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil), + newTestMember(3141198903430435750, []string{"http://10.0.0.2:2379"}, "mem2", nil), + newTestMember(12762790032478827328, []string{"http://127.0.0.1:2379"}, "default", nil), }, }, } for i, tt := range tests { - c := NewCluster("") - if err := c.AddSlice(tt.mems); err != nil { - t.Error(err) + c, err := NewClusterFromString("abc", tt.f) + if err != nil { + t.Fatalf("#%d: unexpected new error: %v", i, err) } - - g := Cluster{} - g.SetMembersFromString(tt.f) - - if g.String() != c.String() { - t.Errorf("#%d: set = %v, want %v", i, g, c) + if c.name != "abc" { + t.Errorf("#%d: name = %v, want abc", i, c.name) + } + wc := newTestCluster(tt.mems) + if !reflect.DeepEqual(c.members, wc.members) { + t.Errorf("#%d: members = %+v, want %+v", i, c.members, wc.members) } } } -func TestClusterGenID(t *testing.T) { - cs := NewCluster("") - cs.AddSlice([]Member{ - newTestMember(1, nil, "", nil), - newTestMember(2, nil, "", nil), - }) - - cs.GenID(nil) - if cs.ID() == 0 { - t.Fatalf("cluster.ID = %v, want not 0", cs.ID()) - } - previd := cs.ID() - - cs.Add(newTestMember(3, nil, "", nil)) - cs.GenID(nil) - if cs.ID() == previd { - t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd) - } - previd = cs.ID() - - cs.GenID([]byte("http://discovery.etcd.io/12345678")) - if cs.ID() == previd { - t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd) - } -} - -func TestClusterSetBad(t *testing.T) { +func TestClusterFromStringBad(t *testing.T) { tests := []string{ // invalid URL "%^", @@ -230,41 +62,111 @@ func TestClusterSetBad(t *testing.T) { "mem1,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379", // TODO(philips): anyone know of a 64 bit sha1 hash collision // "06b2f82fd81b2c20=http://128.193.4.20:2379,02c60cb75083ceef=http://128.193.4.20:2379", + // the same url for two members + "mem1=http://128.193.4.20:2379,mem2=http://128.193.4.20:2379", } for i, tt := range tests { - g := NewCluster("") - if err := g.SetMembersFromString(tt); err == nil { - t.Errorf("#%d: set = %v, want err", i, tt) + if _, err := NewClusterFromString("abc", tt); err == nil { + t.Errorf("#%d: unexpected successful new, want err", i) + } + } +} + +func TestClusterFromStore(t *testing.T) { + tests := []struct { + mems []Member + }{ + { + []Member{newTestMember(1, nil, "node1", nil)}, + }, + { + []Member{}, + }, + { + []Member{ + newTestMember(1, nil, "node1", nil), + newTestMember(2, nil, "node2", nil), + }, + }, + } + for i, tt := range tests { + st := store.New() + hc := newTestCluster(nil) + hc.SetStore(st) + for _, m := range tt.mems { + hc.AddMember(&m) + } + c := NewClusterFromStore("abc", st) + if c.name != "abc" { + t.Errorf("#%d: name = %v, want %v", i, c.name, "abc") + } + wc := newTestCluster(tt.mems) + if !reflect.DeepEqual(c.members, wc.members) { + t.Errorf("#%d: members = %v, want %v", i, c.members, wc.members) + } + } +} + +func TestClusterMember(t *testing.T) { + membs := []Member{ + newTestMember(1, nil, "node1", nil), + newTestMember(2, nil, "node2", nil), + } + tests := []struct { + id uint64 + match bool + }{ + {1, true}, + {2, true}, + {3, false}, + } + for i, tt := range tests { + c := newTestCluster(membs) + m := c.Member(tt.id) + if g := m != nil; g != tt.match { + t.Errorf("#%d: find member = %v, want %v", i, g, tt.match) + } + if m != nil && m.ID != tt.id { + t.Errorf("#%d: id = %x, want %x", i, m.ID, tt.id) + } + } +} + +func TestClusterMemberFromName(t *testing.T) { + membs := []Member{ + newTestMember(1, nil, "node1", nil), + newTestMember(2, nil, "node2", nil), + } + tests := []struct { + name string + match bool + }{ + {"node1", true}, + {"node2", true}, + {"node3", false}, + } + for i, tt := range tests { + c := newTestCluster(membs) + m := c.MemberFromName(tt.name) + if g := m != nil; g != tt.match { + t.Errorf("#%d: find member = %v, want %v", i, g, tt.match) + } + if m != nil && m.Name != tt.name { + t.Errorf("#%d: name = %v, want %v", i, m.Name, tt.name) } } } func TestClusterMemberIDs(t *testing.T) { - cs := NewCluster("") - cs.AddSlice([]Member{ + c := newTestCluster([]Member{ newTestMember(1, nil, "", nil), newTestMember(4, nil, "", nil), newTestMember(100, nil, "", nil), }) w := []uint64{1, 4, 100} - g := cs.MemberIDs() + g := c.MemberIDs() if !reflect.DeepEqual(w, g) { - t.Errorf("IDs=%+v, want %+v", g, w) - } -} - -func TestClusterAddBad(t *testing.T) { - // Should not be possible to add the same ID multiple times - mems := []Member{ - newTestMember(1, nil, "mem1", nil), - newTestMember(1, nil, "mem2", nil), - } - c := NewCluster("") - c.Add(newTestMember(1, nil, "mem1", nil)) - for i, m := range mems { - if err := c.Add(m); err == nil { - t.Errorf("#%d: set = %v, want err", i, err) - } + t.Errorf("IDs = %+v, want %+v", g, w) } } @@ -315,11 +217,7 @@ func TestClusterPeerURLs(t *testing.T) { } for i, tt := range tests { - c := NewCluster("") - if err := c.AddSlice(tt.mems); err != nil { - t.Errorf("AddSlice error: %v", err) - continue - } + c := newTestCluster(tt.mems) urls := c.PeerURLs() if !reflect.DeepEqual(urls, tt.wurls) { t.Errorf("#%d: PeerURLs = %v, want %v", i, urls, tt.wurls) @@ -374,14 +272,152 @@ func TestClusterClientURLs(t *testing.T) { } for i, tt := range tests { - c := NewCluster("") - if err := c.AddSlice(tt.mems); err != nil { - t.Errorf("AddSlice error: %v", err) - continue - } + c := newTestCluster(tt.mems) urls := c.ClientURLs() if !reflect.DeepEqual(urls, tt.wurls) { t.Errorf("#%d: ClientURLs = %v, want %v", i, urls, tt.wurls) } } } + +func TestClusterGenID(t *testing.T) { + cs := newTestCluster([]Member{ + newTestMember(1, nil, "", nil), + newTestMember(2, nil, "", nil), + }) + + cs.GenID(nil) + if cs.ID() == 0 { + t.Fatalf("cluster.ID = %v, want not 0", cs.ID()) + } + previd := cs.ID() + + cs.SetStore(&storeRecorder{}) + cs.AddMember(newTestMemberp(3, nil, "", nil)) + cs.GenID(nil) + if cs.ID() == previd { + t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd) + } + previd = cs.ID() + + cs.GenID([]byte("http://discovery.etcd.io/12345678")) + if cs.ID() == previd { + t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd) + } +} + +func TestNodeToMemberBad(t *testing.T) { + tests := []*store.NodeExtern{ + {Key: "/1234", Nodes: []*store.NodeExtern{ + {Key: "/1234/strange"}, + }}, + {Key: "/1234", Nodes: []*store.NodeExtern{ + {Key: "/1234/dynamic", Value: stringp("garbage")}, + }}, + {Key: "/1234", Nodes: []*store.NodeExtern{ + {Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)}, + }}, + {Key: "/1234", Nodes: []*store.NodeExtern{ + {Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)}, + {Key: "/1234/strange"}, + }}, + {Key: "/1234", Nodes: []*store.NodeExtern{ + {Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)}, + {Key: "/1234/static", Value: stringp("garbage")}, + }}, + {Key: "/1234", Nodes: []*store.NodeExtern{ + {Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)}, + {Key: "/1234/static", Value: stringp(`{"Name":"node1","ClientURLs":null}`)}, + {Key: "/1234/strange"}, + }}, + } + for i, tt := range tests { + if _, err := nodeToMember(tt); err == nil { + t.Errorf("#%d: unexpected nil error", i) + } + } +} + +func TestClusterAddMember(t *testing.T) { + st := &storeRecorder{} + c := newTestCluster(nil) + c.SetStore(st) + c.AddMember(newTestMemberp(1, nil, "node1", nil)) + + wactions := []action{ + { + name: "Create", + params: []interface{}{ + path.Join(storeMembersPrefix, "1", "raftAttributes"), + false, + `{"PeerURLs":null}`, + false, + store.Permanent, + }, + }, + { + name: "Create", + params: []interface{}{ + path.Join(storeMembersPrefix, "1", "attributes"), + false, + `{"Name":"node1"}`, + false, + store.Permanent, + }, + }, + } + if g := st.Action(); !reflect.DeepEqual(g, wactions) { + t.Errorf("actions = %v, want %v", g, wactions) + } +} + +func TestClusterRemoveMember(t *testing.T) { + st := &storeRecorder{} + c := newTestCluster(nil) + c.SetStore(st) + c.RemoveMember(1) + + wactions := []action{ + {name: "Delete", params: []interface{}{memberStoreKey(1), true, true}}, + {name: "Create", params: []interface{}{removedMemberStoreKey(1), false, "", false, store.Permanent}}, + } + if !reflect.DeepEqual(st.Action(), wactions) { + t.Errorf("actions = %v, want %v", st.Action(), wactions) + } +} + +func TestNodeToMember(t *testing.T) { + n := &store.NodeExtern{Key: "/1234", Nodes: []*store.NodeExtern{ + {Key: "/1234/attributes", Value: stringp(`{"Name":"node1","ClientURLs":null}`)}, + {Key: "/1234/raftAttributes", Value: stringp(`{"PeerURLs":null}`)}, + }} + wm := &Member{ID: 0x1234, RaftAttributes: RaftAttributes{}, Attributes: Attributes{Name: "node1"}} + m, err := nodeToMember(n) + if err != nil { + t.Fatalf("unexpected nodeToMember error: %v", err) + } + if !reflect.DeepEqual(m, wm) { + t.Errorf("member = %+v, want %+v", m, wm) + } +} + +func newTestCluster(membs []Member) *Cluster { + c := &Cluster{members: make(map[uint64]*Member), removed: make(map[uint64]bool)} + for i, m := range membs { + c.members[m.ID] = &membs[i] + } + return c +} + +func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member { + return Member{ + ID: id, + RaftAttributes: RaftAttributes{PeerURLs: peerURLs}, + Attributes: Attributes{Name: name, ClientURLs: clientURLs}, + } +} + +func newTestMemberp(id uint64, peerURLs []string, name string, clientURLs []string) *Member { + m := newTestMember(id, peerURLs, name, clientURLs) + return &m +} diff --git a/etcdserver/config.go b/etcdserver/config.go index 1bdb0c34a..33cd09fcf 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -40,7 +40,7 @@ type ServerConfig struct { // VerifyBootstrapConfig sanity-checks the initial config and returns an error // for things that should never happen. func (c *ServerConfig) VerifyBootstrapConfig() error { - m := c.Cluster.FindName(c.Name) + m := c.Cluster.MemberFromName(c.Name) // Make sure the cluster at least contains the local server. if m == nil { return fmt.Errorf("couldn't find local name %s in the initial cluster configuration", c.Name) diff --git a/etcdserver/config_test.go b/etcdserver/config_test.go index f08042091..1531d18f7 100644 --- a/etcdserver/config_test.go +++ b/etcdserver/config_test.go @@ -44,8 +44,7 @@ func TestBootstrapConfigVerify(t *testing.T) { } for i, tt := range tests { - cluster := &Cluster{} - err := cluster.SetMembersFromString(tt.clusterSetting) + cluster, err := NewClusterFromString("", tt.clusterSetting) if err != nil && tt.shouldError { continue } diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 6a91b393a..99e1b316e 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -59,12 +59,12 @@ var errClosed = errors.New("etcdhttp: client closed connection") // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests. func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { sh := &serverHandler{ - server: server, - clusterStore: server.ClusterStore, - stats: server, - timer: server, - timeout: defaultServerTimeout, - clock: clockwork.NewRealClock(), + server: server, + clusterInfo: server.Cluster, + stats: server, + timer: server, + timeout: defaultServerTimeout, + clock: clockwork.NewRealClock(), } mux := http.NewServeMux() mux.HandleFunc(keysPrefix, sh.serveKeys) @@ -84,10 +84,10 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { sh := &serverHandler{ - server: server, - stats: server, - clusterStore: server.ClusterStore, - clock: clockwork.NewRealClock(), + server: server, + stats: server, + clusterInfo: server.Cluster, + clock: clockwork.NewRealClock(), } mux := http.NewServeMux() mux.HandleFunc(raftPrefix, sh.serveRaft) @@ -97,12 +97,12 @@ func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { // serverHandler provides http.Handlers for etcd client and raft communication. type serverHandler struct { - timeout time.Duration - server etcdserver.Server - stats etcdserver.Stats - timer etcdserver.RaftTimer - clusterStore etcdserver.ClusterStore - clock clockwork.Clock + timeout time.Duration + server etcdserver.Server + stats etcdserver.Stats + timer etcdserver.RaftTimer + clusterInfo etcdserver.ClusterInfo + clock clockwork.Clock } func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) { @@ -145,7 +145,7 @@ func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET", "HEAD") { return } - endpoints := h.clusterStore.Get().ClientURLs() + endpoints := h.clusterInfo.ClientURLs() w.Write([]byte(strings.Join(endpoints, ", "))) } @@ -267,7 +267,7 @@ func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) { return } - wcid := strconv.FormatUint(h.clusterStore.Get().ID(), 16) + wcid := strconv.FormatUint(h.clusterInfo.ID(), 16) w.Header().Set("X-Etcd-Cluster-ID", wcid) gcid := r.Header.Get("X-Etcd-Cluster-ID") diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 00c7f3a83..8450a7e2c 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -610,7 +610,7 @@ func TestV2DeprecatedMachinesEndpoint(t *testing.T) { {"POST", http.StatusMethodNotAllowed}, } - m := NewClientHandler(&etcdserver.EtcdServer{ClusterStore: &fakeCluster{}}) + m := NewClientHandler(&etcdserver.EtcdServer{Cluster: &etcdserver.Cluster{}}) s := httptest.NewServer(m) defer s.Close() @@ -632,19 +632,14 @@ func TestV2DeprecatedMachinesEndpoint(t *testing.T) { func TestServeMachines(t *testing.T) { cluster := &fakeCluster{ - members: []etcdserver.Member{ - {ID: 0xBEEF0, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}}, - {ID: 0xBEEF1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}}, - {ID: 0xBEEF2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8082"}}}, - }, + clientURLs: []string{"http://localhost:8080", "http://localhost:8081", "http://localhost:8082"}, } - writer := httptest.NewRecorder() req, err := http.NewRequest("GET", "", nil) if err != nil { t.Fatal(err) } - h := &serverHandler{clusterStore: cluster} + h := &serverHandler{clusterInfo: cluster} h.serveMachines(writer, req) w := "http://localhost:8080, http://localhost:8081, http://localhost:8082" if g := writer.Body.String(); g != w { @@ -981,9 +976,9 @@ func TestServeRaft(t *testing.T) { } req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID) h := &serverHandler{ - timeout: time.Hour, - server: &errServer{tt.serverErr}, - clusterStore: &fakeCluster{}, + timeout: time.Hour, + server: &errServer{tt.serverErr}, + clusterInfo: &fakeCluster{id: 0}, } rw := httptest.NewRecorder() h.serveRaft(rw, req) @@ -1750,17 +1745,9 @@ func TestTrimNodeExternPrefix(t *testing.T) { } type fakeCluster struct { - members []etcdserver.Member + id uint64 + clientURLs []string } -func (c *fakeCluster) Add(m etcdserver.Member) { return } - -func (c *fakeCluster) Get() etcdserver.Cluster { - cl := etcdserver.NewCluster("") - cl.AddSlice(c.members) - return *cl -} - -func (c *fakeCluster) Remove(id uint64) { return } - -func (c *fakeCluster) IsRemoved(id uint64) bool { return false } +func (c *fakeCluster) ID() uint64 { return c.id } +func (c *fakeCluster) ClientURLs() []string { return c.clientURLs } diff --git a/etcdserver/member.go b/etcdserver/member.go index 5a2ca54d0..b4d40252f 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -21,6 +21,7 @@ import ( "encoding/binary" "fmt" "log" + "math/rand" "path" "sort" "strconv" @@ -71,6 +72,15 @@ func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.T return m } +// Pick chooses a random address from a given Member's addresses, and returns it as +// an addressible URI. If the given member does not exist, an empty string is returned. +func (m *Member) Pick() string { + if len(m.PeerURLs) == 0 { + panic("member should always have some peer url") + } + return m.PeerURLs[rand.Intn(len(m.PeerURLs))] +} + func memberStoreKey(id uint64) string { return path.Join(storeMembersPrefix, idAsHex(id)) } diff --git a/etcdserver/member_test.go b/etcdserver/member_test.go index a45221c3d..011007c3a 100644 --- a/etcdserver/member_test.go +++ b/etcdserver/member_test.go @@ -53,3 +53,36 @@ func TestMemberTime(t *testing.T) { } } } + +func TestMemberPick(t *testing.T) { + tests := []struct { + memb *Member + urls map[string]bool + }{ + { + newTestMemberp(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil), + map[string]bool{ + "abc": true, + "def": true, + "ghi": true, + "jkl": true, + "mno": true, + "pqr": true, + "stu": true, + }, + }, + { + newTestMemberp(2, []string{"xyz"}, "", nil), + map[string]bool{"xyz": true}, + }, + } + for i, tt := range tests { + for j := 0; j < 1000; j++ { + a := tt.memb.Pick() + if !tt.urls[a] { + t.Errorf("#%d: returned ID %q not in expected range!", i, a) + break + } + } + } +} diff --git a/etcdserver/sender.go b/etcdserver/sender.go new file mode 100644 index 000000000..62aaaa71b --- /dev/null +++ b/etcdserver/sender.go @@ -0,0 +1,121 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package etcdserver + +import ( + "bytes" + "fmt" + "log" + "net/http" + "strconv" + "time" + + "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/raft/raftpb" +) + +const raftPrefix = "/raft" + +// Sender creates the default production sender used to transport raft messages +// in the cluster. The returned sender will update the given ServerStats and +// LeaderStats appropriately. +func Sender(t *http.Transport, cl *Cluster, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) { + c := &http.Client{Transport: t} + + return func(msgs []raftpb.Message) { + for _, m := range msgs { + // TODO: reuse go routines + // limit the number of outgoing connections for the same receiver + go send(c, cl, m, ss, ls) + } + } +} + +// send uses the given client to send a message to a member in the given +// ClusterStore, retrying up to 3 times for each message. The given +// ServerStats and LeaderStats are updated appropriately +func send(c *http.Client, cl *Cluster, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) { + cid := cl.ID() + // TODO (xiangli): reasonable retry logic + for i := 0; i < 3; i++ { + memb := cl.Member(m.To) + if memb == nil { + // TODO: unknown peer id.. what do we do? I + // don't think his should ever happen, need to + // look into this further. + log.Printf("etcdhttp: no member for %d", m.To) + return + } + u := fmt.Sprintf("%s%s", memb.Pick(), raftPrefix) + + // TODO: don't block. we should be able to have 1000s + // of messages out at a time. + data, err := m.Marshal() + if err != nil { + log.Println("etcdhttp: dropping message:", err) + return // drop bad message + } + if m.Type == raftpb.MsgApp { + ss.SendAppendReq(len(data)) + } + to := idAsHex(m.To) + fs := ls.Follower(to) + + start := time.Now() + sent := httpPost(c, u, cid, data) + end := time.Now() + if sent { + fs.Succ(end.Sub(start)) + return + } + fs.Fail() + // TODO: backoff + } +} + +// httpPost POSTs a data payload to a url using the given client. Returns true +// if the POST succeeds, false on any failure. +func httpPost(c *http.Client, url string, cid uint64, data []byte) bool { + req, err := http.NewRequest("POST", url, bytes.NewBuffer(data)) + if err != nil { + // TODO: log the error? + return false + } + req.Header.Set("Content-Type", "application/protobuf") + req.Header.Set("X-Etcd-Cluster-ID", strconv.FormatUint(cid, 16)) + resp, err := c.Do(req) + if err != nil { + // TODO: log the error? + return false + } + resp.Body.Close() + + switch resp.StatusCode { + case http.StatusPreconditionFailed: + // TODO: shutdown the etcdserver gracefully? + log.Panicf("clusterID mismatch") + return false + case http.StatusForbidden: + // TODO: stop the server + log.Panicf("the member has been removed") + return false + case http.StatusNoContent: + return true + default: + return false + } +} diff --git a/etcdserver/server.go b/etcdserver/server.go index 0a63099f9..df6546fe6 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -137,10 +137,9 @@ type EtcdServer struct { done chan struct{} stopped chan struct{} id uint64 - clusterID uint64 attributes Attributes - ClusterStore ClusterStore + Cluster *Cluster node raft.Node store store.Store @@ -176,12 +175,12 @@ func NewServer(cfg *ServerConfig) *EtcdServer { st := store.New() var w *wal.WAL var n raft.Node - var id, cid uint64 + var id uint64 if !wal.Exist(cfg.WALDir()) { if err := cfg.VerifyBootstrapConfig(); err != nil { log.Fatalf("etcdserver: %v", err) } - m := cfg.Cluster.FindName(cfg.Name) + m := cfg.Cluster.MemberFromName(cfg.Name) if cfg.ShouldDiscover() { d, err := discovery.New(cfg.DiscoveryURL, m.ID, cfg.Cluster.String()) if err != nil { @@ -191,11 +190,12 @@ func NewServer(cfg *ServerConfig) *EtcdServer { if err != nil { log.Fatalf("etcdserver: %v", err) } - if err = cfg.Cluster.SetMembersFromString(s); err != nil { + if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.name, s); err != nil { log.Fatalf("etcdserver: %v", err) } } - id, cid, n, w = startNode(cfg) + cfg.Cluster.SetStore(st) + id, n, w = startNode(cfg) } else { if cfg.ShouldDiscover() { log.Printf("etcdserver: warn: ignoring discovery: etcd has already been initialized and has a valid log in %q", cfg.WALDir()) @@ -210,11 +210,10 @@ func NewServer(cfg *ServerConfig) *EtcdServer { st.Recovery(snapshot.Data) index = snapshot.Index } - id, cid, n, w = restartNode(cfg, index, snapshot) + cfg.Cluster = NewClusterFromStore(cfg.Cluster.name, st) + id, n, w = restartNode(cfg, index, snapshot) } - cls := &clusterStore{Store: st, id: cid} - sstats := &stats.ServerStats{ Name: cfg.Name, ID: idAsHex(id), @@ -225,19 +224,18 @@ func NewServer(cfg *ServerConfig) *EtcdServer { store: st, node: n, id: id, - clusterID: cid, attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, + Cluster: cfg.Cluster, storage: struct { *wal.WAL *snap.Snapshotter }{w, ss}, - stats: sstats, - lstats: lstats, - send: Sender(cfg.Transport, cls, sstats, lstats), - Ticker: time.Tick(100 * time.Millisecond), - SyncTicker: time.Tick(500 * time.Millisecond), - snapCount: cfg.SnapCount, - ClusterStore: cls, + stats: sstats, + lstats: lstats, + send: Sender(cfg.Transport, cfg.Cluster, sstats, lstats), + Ticker: time.Tick(100 * time.Millisecond), + SyncTicker: time.Tick(500 * time.Millisecond), + snapCount: cfg.SnapCount, } return s } @@ -268,7 +266,7 @@ func (s *EtcdServer) start() { } func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { - if s.ClusterStore.IsRemoved(m.From) { + if s.Cluster.IsMemberRemoved(m.From) { return ErrRemoved } return s.node.Step(ctx, m) @@ -599,22 +597,22 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error s.node.ApplyConfChange(cc) switch cc.Type { case raftpb.ConfChangeAddNode: - var m Member - if err := json.Unmarshal(cc.Context, &m); err != nil { + m := new(Member) + if err := json.Unmarshal(cc.Context, m); err != nil { panic("unexpected unmarshal error") } if cc.NodeID != m.ID { panic("unexpected nodeID mismatch") } - s.ClusterStore.Add(m) + s.Cluster.AddMember(m) case raftpb.ConfChangeRemoveNode: - s.ClusterStore.Remove(cc.NodeID) + s.Cluster.RemoveMember(cc.NodeID) } return nil } func (s *EtcdServer) checkConfChange(cc raftpb.ConfChange, nodes []uint64) error { - if s.ClusterStore.IsRemoved(cc.NodeID) { + if s.Cluster.IsMemberRemoved(cc.NodeID) { return ErrIDRemoved } switch cc.Type { @@ -644,11 +642,11 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) { s.storage.Cut() } -func startNode(cfg *ServerConfig) (id, cid uint64, n raft.Node, w *wal.WAL) { +func startNode(cfg *ServerConfig) (id uint64, n raft.Node, w *wal.WAL) { var err error // TODO: remove the discoveryURL when it becomes part of the source for // generating nodeID. - member := cfg.Cluster.FindName(cfg.Name) + member := cfg.Cluster.MemberFromName(cfg.Name) cfg.Cluster.GenID([]byte(cfg.DiscoveryURL)) metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: member.ID, ClusterID: cfg.Cluster.ID()}) if w, err = wal.Create(cfg.WALDir(), metadata); err != nil { @@ -657,19 +655,19 @@ func startNode(cfg *ServerConfig) (id, cid uint64, n raft.Node, w *wal.WAL) { ids := cfg.Cluster.MemberIDs() peers := make([]raft.Peer, len(ids)) for i, id := range ids { - ctx, err := json.Marshal((*cfg.Cluster).FindID(id)) + ctx, err := json.Marshal((*cfg.Cluster).Member(id)) if err != nil { log.Fatal(err) } peers[i] = raft.Peer{ID: id, Context: ctx} } - id, cid = member.ID, cfg.Cluster.ID() - log.Printf("etcdserver: start node %d in cluster %d", id, cid) - n = raft.StartNode(member.ID, peers, 10, 1) + id = member.ID + log.Printf("etcdserver: start node %x in cluster %x", id, cfg.Cluster.ID()) + n = raft.StartNode(id, peers, 10, 1) return } -func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id, cid uint64, n raft.Node, w *wal.WAL) { +func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id uint64, n raft.Node, w *wal.WAL) { var err error // restart a node from previous wal if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil { @@ -682,8 +680,9 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id var metadata pb.Metadata pbutil.MustUnmarshal(&metadata, wmetadata) - id, cid = metadata.NodeID, metadata.ClusterID - log.Printf("etcdserver: restart member %x in cluster %x at commit index %d", id, cid, st.Commit) + id = metadata.NodeID + cfg.Cluster.SetID(metadata.ClusterID) + log.Printf("etcdserver: restart member %x in cluster %x at commit index %d", id, cfg.Cluster.ID(), st.Commit) n = raft.RestartNode(id, 10, 1, snapshot, st, ents) return } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 6c5a10bc0..26d7e807a 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -423,10 +423,10 @@ func TestApplyConfChangeError(t *testing.T) { } for i, tt := range tests { n := &nodeRecorder{} - cs := &removedClusterStore{removed: removed} + cl := &Cluster{removed: removed} srv := &EtcdServer{ - node: n, - ClusterStore: cs, + node: n, + Cluster: cl, } err := srv.applyConfChange(tt.cc, nodes) if err != tt.werr { @@ -471,13 +471,15 @@ func testServer(t *testing.T, ns uint64) { n := raft.StartNode(id, members, 10, 1) tk := time.NewTicker(10 * time.Millisecond) defer tk.Stop() + cl := newCluster("abc") + cl.SetStore(&storeRecorder{}) srv := &EtcdServer{ - node: n, - store: store.New(), - send: send, - storage: &storageRecorder{}, - Ticker: tk.C, - ClusterStore: &clusterStoreRecorder{}, + node: n, + store: store.New(), + send: send, + storage: &storageRecorder{}, + Ticker: tk.C, + Cluster: cl, } srv.start() ss[i] = srv @@ -538,13 +540,15 @@ func TestDoProposal(t *testing.T) { tk := make(chan time.Time) // this makes <-tk always successful, which accelerates internal clock close(tk) + cl := newCluster("abc") + cl.SetStore(&storeRecorder{}) srv := &EtcdServer{ - node: n, - store: st, - send: func(_ []raftpb.Message) {}, - storage: &storageRecorder{}, - Ticker: tk, - ClusterStore: &clusterStoreRecorder{}, + node: n, + store: st, + send: func(_ []raftpb.Message) {}, + storage: &storageRecorder{}, + Ticker: tk, + Cluster: cl, } srv.start() resp, err := srv.Do(ctx, tt) @@ -782,12 +786,12 @@ func TestTriggerSnap(t *testing.T) { st := &storeRecorder{} p := &storageRecorder{} s := &EtcdServer{ - store: st, - send: func(_ []raftpb.Message) {}, - storage: p, - node: n, - snapCount: 10, - ClusterStore: &clusterStoreRecorder{}, + store: st, + send: func(_ []raftpb.Message) {}, + storage: p, + node: n, + snapCount: 10, + Cluster: &Cluster{}, } s.start() @@ -872,19 +876,20 @@ func TestAddMember(t *testing.T) { n.readyc <- raft.Ready{ SoftState: &raft.SoftState{ RaftState: raft.StateLeader, - Nodes: []uint64{2, 3}, + Nodes: []uint64{2345, 3456}, }, } - cs := &clusterStoreRecorder{} + cl := newTestCluster(nil) + cl.SetStore(&storeRecorder{}) s := &EtcdServer{ - node: n, - store: &storeRecorder{}, - send: func(_ []raftpb.Message) {}, - storage: &storageRecorder{}, - ClusterStore: cs, + node: n, + store: &storeRecorder{}, + send: func(_ []raftpb.Message) {}, + storage: &storageRecorder{}, + Cluster: cl, } s.start() - m := Member{ID: 1, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}} + m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}} err := s.AddMember(context.TODO(), m) gaction := n.Action() s.Stop() @@ -896,9 +901,8 @@ func TestAddMember(t *testing.T) { if !reflect.DeepEqual(gaction, wactions) { t.Errorf("action = %v, want %v", gaction, wactions) } - wcsactions := []action{{name: "Add", params: []interface{}{m}}} - if g := cs.Action(); !reflect.DeepEqual(g, wcsactions) { - t.Errorf("csaction = %v, want %v", g, wcsactions) + if cl.Member(1234) == nil { + t.Errorf("member with id 1234 is not added") } } @@ -908,20 +912,20 @@ func TestRemoveMember(t *testing.T) { n.readyc <- raft.Ready{ SoftState: &raft.SoftState{ RaftState: raft.StateLeader, - Nodes: []uint64{1, 2, 3}, + Nodes: []uint64{1234, 2345, 3456}, }, } - cs := &clusterStoreRecorder{} + cl := newTestCluster([]Member{{ID: 1234}}) + cl.SetStore(&storeRecorder{}) s := &EtcdServer{ - node: n, - store: &storeRecorder{}, - send: func(_ []raftpb.Message) {}, - storage: &storageRecorder{}, - ClusterStore: cs, + node: n, + store: &storeRecorder{}, + send: func(_ []raftpb.Message) {}, + storage: &storageRecorder{}, + Cluster: cl, } s.start() - id := uint64(1) - err := s.RemoveMember(context.TODO(), id) + err := s.RemoveMember(context.TODO(), 1234) gaction := n.Action() s.Stop() @@ -932,9 +936,8 @@ func TestRemoveMember(t *testing.T) { if !reflect.DeepEqual(gaction, wactions) { t.Errorf("action = %v, want %v", gaction, wactions) } - wcsactions := []action{{name: "Remove", params: []interface{}{id}}} - if g := cs.Action(); !reflect.DeepEqual(g, wcsactions) { - t.Errorf("csaction = %v, want %v", g, wcsactions) + if cl.Member(1234) != nil { + t.Errorf("member with id 1234 is not removed") } } diff --git a/integration/cluster_test.go b/integration/cluster_test.go index b4cfd78cc..8742fedf4 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -85,7 +85,7 @@ func (c *cluster) Launch(t *testing.T) { } lns := make([]net.Listener, c.Size) - clusterCfg := etcdserver.NewCluster(clusterName) + infos := make([]etcdserver.MemberInfo, c.Size) for i := 0; i < c.Size; i++ { l := newLocalListener(t) // each member claims only one peer listener @@ -94,9 +94,7 @@ func (c *cluster) Launch(t *testing.T) { if err != nil { t.Fatal(err) } - if _, err = clusterCfg.AddMemberFromURLs(c.name(i), listenURLs); err != nil { - t.Fatal(err) - } + infos[i] = etcdserver.MemberInfo{Name: c.name(i), PeerURLs: listenURLs} } var err error @@ -114,7 +112,10 @@ func (c *cluster) Launch(t *testing.T) { if err != nil { t.Fatal(err) } - m.Cluster = clusterCfg + m.Cluster, err = etcdserver.NewClusterFromMemberInfos(clusterName, infos) + if err != nil { + t.Fatal(err) + } m.ClusterState = etcdserver.ClusterStateValueNew m.Transport, err = transport.NewTransport(transport.TLSInfo{}) if err != nil { diff --git a/main.go b/main.go index 06871dd58..aa0702791 100644 --- a/main.go +++ b/main.go @@ -262,7 +262,6 @@ func startProxy() { // setupCluster sets up the cluster definition for bootstrap or discovery. func setupCluster() error { - cluster = etcdserver.NewCluster(*initialClusterName) set := make(map[string]bool) fs.Visit(func(f *flag.Flag) { set[f.Name] = true @@ -275,17 +274,17 @@ func setupCluster() error { return err } + err = nil switch { case set["discovery"]: - cluster = etcdserver.NewCluster(*durl) - _, err := cluster.AddMemberFromURLs(*name, apurls) - return err + infos := []etcdserver.MemberInfo{{Name: *name, PeerURLs: apurls}} + cluster, err = etcdserver.NewClusterFromMemberInfos(*durl, infos) case set["initial-cluster"]: fallthrough default: // We're statically configured, and cluster has appropriately been set. // Try to configure by indexing the static cluster by name. - cluster.SetMembersFromString(*initialCluster) + cluster, err = etcdserver.NewClusterFromString(*initialClusterName, *initialCluster) } - return nil + return err }