etcdserver: refactor cluster and clusterStore

Integrate clusterStore into cluster, and let cluster become the source of
cluster info.
This commit is contained in:
Yicheng Qin 2014-10-22 16:15:37 -07:00
parent 16c9970a03
commit 89572b5fd7
15 changed files with 735 additions and 926 deletions

View File

@ -19,28 +19,129 @@ package etcdserver
import (
"crypto/sha1"
"encoding/binary"
"encoding/json"
"fmt"
"math/rand"
"log"
"net/url"
"sort"
"strings"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/pkg/flags"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/store"
)
const (
raftAttributesSuffix = "/raftAttributes"
attributesSuffix = "/attributes"
)
type ClusterInfo interface {
ID() uint64
ClientURLs() []string
}
// Cluster is a list of Members that belong to the same raft cluster
type Cluster struct {
id uint64
name string
members map[uint64]*Member
removed map[uint64]bool
store store.Store
}
func NewCluster(clusterName string) *Cluster {
return &Cluster{name: clusterName, members: make(map[uint64]*Member)}
// NewClusterFromString returns Cluster through given clusterName and parsing
// members from a sets of names to IPs discovery formatted like:
// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach0=http://1.1.1.1,mach1=http://2.2.2.2,mach1=http://3.3.3.3
func NewClusterFromString(name string, cluster string) (*Cluster, error) {
c := newCluster(name)
v, err := url.ParseQuery(strings.Replace(cluster, ",", "&", -1))
if err != nil {
return nil, err
}
for name, urls := range v {
if len(urls) == 0 || urls[0] == "" {
return nil, fmt.Errorf("Empty URL given for %q", name)
}
m := NewMember(name, types.URLs(*flags.NewURLsValue(strings.Join(urls, ","))), c.name, nil)
if _, ok := c.members[m.ID]; ok {
return nil, fmt.Errorf("Member exists with identical ID %v", m)
}
c.members[m.ID] = m
}
return c, nil
}
func (c Cluster) FindName(name string) *Member {
type MemberInfo struct {
Name string
PeerURLs types.URLs
}
// NewClusterFromMembers returns Cluster with the given members.
func NewClusterFromMemberInfos(name string, infos []MemberInfo) (*Cluster, error) {
c := newCluster(name)
for _, info := range infos {
m := NewMember(info.Name, info.PeerURLs, c.name, nil)
if _, ok := c.members[m.ID]; ok {
return nil, fmt.Errorf("Member exists with identical ID %v", m)
}
c.members[m.ID] = m
}
return c, nil
}
func NewClusterFromStore(name string, st store.Store) *Cluster {
c := newCluster(name)
c.store = st
e, err := c.store.Get(storeMembersPrefix, true, true)
if err != nil {
if isKeyNotFound(err) {
return c
}
log.Panicf("get member should never fail: %v", err)
}
for _, n := range e.Node.Nodes {
m, err := nodeToMember(n)
if err != nil {
log.Panicf("unexpected nodeToMember error: %v", err)
}
c.members[m.ID] = m
}
e, err = c.store.Get(storeRemovedMembersPrefix, true, true)
if err != nil {
if isKeyNotFound(err) {
return c
}
log.Panicf("get member should never fail: %v", err)
}
for _, n := range e.Node.Nodes {
c.removed[parseMemberID(n.Key)] = true
}
return c
}
func newCluster(name string) *Cluster {
return &Cluster{
name: name,
members: make(map[uint64]*Member),
removed: make(map[uint64]bool),
}
}
func (c Cluster) ID() uint64 { return c.id }
func (c Cluster) Members() map[uint64]*Member { return c.members }
func (c *Cluster) Member(id uint64) *Member {
return c.members[id]
}
func (c *Cluster) MemberFromName(name string) *Member {
for _, m := range c.members {
if m.Name == name {
return m
@ -49,101 +150,6 @@ func (c Cluster) FindName(name string) *Member {
return nil
}
func (c Cluster) FindID(id uint64) *Member {
return c.members[id]
}
func (c Cluster) Add(m Member) error {
if c.FindID(m.ID) != nil {
return fmt.Errorf("Member exists with identical ID %v", m)
}
c.members[m.ID] = &m
return nil
}
func (c *Cluster) AddSlice(mems []Member) error {
for _, m := range mems {
err := c.Add(m)
if err != nil {
return err
}
}
return nil
}
// Pick chooses a random address from a given Member's addresses, and returns it as
// an addressible URI. If the given member does not exist, an empty string is returned.
func (c Cluster) Pick(id uint64) string {
if m := c.FindID(id); m != nil {
urls := m.PeerURLs
if len(urls) == 0 {
return ""
}
return urls[rand.Intn(len(urls))]
}
return ""
}
// SetMembersFromString parses a sets of names to IPs either from the command line or discovery formatted like:
// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach0=http://1.1.1.1,mach1=http://2.2.2.2,mach1=http://3.3.3.3
func (c *Cluster) SetMembersFromString(s string) error {
c.members = make(map[uint64]*Member)
v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1))
if err != nil {
return err
}
for name, urls := range v {
if len(urls) == 0 || urls[0] == "" {
return fmt.Errorf("Empty URL given for %q", name)
}
m := NewMember(name, types.URLs(*flags.NewURLsValue(strings.Join(urls, ","))), c.name, nil)
err := c.Add(*m)
if err != nil {
return err
}
}
return nil
}
func (c *Cluster) AddMemberFromURLs(name string, urls types.URLs) (*Member, error) {
m := NewMember(name, urls, c.name, nil)
err := c.Add(*m)
if err != nil {
return nil, err
}
return m, nil
}
func (c *Cluster) GenID(salt []byte) {
mIDs := c.MemberIDs()
b := make([]byte, 8*len(mIDs))
for i, id := range mIDs {
binary.BigEndian.PutUint64(b[8*i:], id)
}
b = append(b, salt...)
hash := sha1.Sum(b)
c.id = binary.BigEndian.Uint64(hash[:8])
}
func (c Cluster) String() string {
sl := []string{}
for _, m := range c.members {
for _, u := range m.PeerURLs {
sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u))
}
}
sort.Strings(sl)
return strings.Join(sl, ",")
}
func (c Cluster) ID() uint64 { return c.id }
func (c Cluster) Members() map[uint64]*Member { return c.members }
func (c Cluster) MemberIDs() []uint64 {
var ids []uint64
for _, m := range c.members {
@ -153,6 +159,10 @@ func (c Cluster) MemberIDs() []uint64 {
return ids
}
func (c *Cluster) IsMemberRemoved(id uint64) bool {
return c.removed[id]
}
// PeerURLs returns a list of all peer addresses. Each address is prefixed
// with the scheme (currently "http://"). The returned list is sorted in
// ascending lexicographical order.
@ -180,3 +190,93 @@ func (c Cluster) ClientURLs() []string {
sort.Strings(urls)
return urls
}
func (c Cluster) String() string {
sl := []string{}
for _, m := range c.members {
for _, u := range m.PeerURLs {
sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u))
}
}
sort.Strings(sl)
return strings.Join(sl, ",")
}
func (c *Cluster) GenID(salt []byte) {
mIDs := c.MemberIDs()
b := make([]byte, 8*len(mIDs))
for i, id := range mIDs {
binary.BigEndian.PutUint64(b[8*i:], id)
}
b = append(b, salt...)
hash := sha1.Sum(b)
c.id = binary.BigEndian.Uint64(hash[:8])
}
func (c *Cluster) SetID(id uint64) {
c.id = id
}
func (c *Cluster) SetStore(st store.Store) {
c.store = st
}
// AddMember puts a new Member into the store.
// A Member with a matching id must not exist.
func (c *Cluster) AddMember(m *Member) {
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
log.Panicf("marshal error: %v", err)
}
if _, err := c.store.Create(memberStoreKey(m.ID)+raftAttributesSuffix, false, string(b), false, store.Permanent); err != nil {
log.Panicf("add raftAttributes should never fail: %v", err)
}
b, err = json.Marshal(m.Attributes)
if err != nil {
log.Panicf("marshal error: %v", err)
}
if _, err := c.store.Create(memberStoreKey(m.ID)+attributesSuffix, false, string(b), false, store.Permanent); err != nil {
log.Panicf("add attributes should never fail: %v", err)
}
c.members[m.ID] = m
}
// RemoveMember removes a member from the store.
// The given id MUST exist.
func (c *Cluster) RemoveMember(id uint64) {
if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
log.Panicf("delete peer should never fail: %v", err)
}
delete(c.members, id)
if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
log.Panicf("creating RemovedMember should never fail: %v", err)
}
c.removed[id] = true
}
// nodeToMember builds member through a store node.
// the child nodes of the given node should be sorted by key.
func nodeToMember(n *store.NodeExtern) (*Member, error) {
m := &Member{ID: parseMemberID(n.Key)}
if len(n.Nodes) != 2 {
return m, fmt.Errorf("len(nodes) = %d, want 2", len(n.Nodes))
}
if w := n.Key + attributesSuffix; n.Nodes[0].Key != w {
return m, fmt.Errorf("key = %v, want %v", n.Nodes[0].Key, w)
}
if err := json.Unmarshal([]byte(*n.Nodes[0].Value), &m.Attributes); err != nil {
return m, fmt.Errorf("unmarshal attributes error: %v", err)
}
if w := n.Key + raftAttributesSuffix; n.Nodes[1].Key != w {
return m, fmt.Errorf("key = %v, want %v", n.Nodes[1].Key, w)
}
if err := json.Unmarshal([]byte(*n.Nodes[1].Value), &m.RaftAttributes); err != nil {
return m, fmt.Errorf("unmarshal raftAttributes error: %v", err)
}
return m, nil
}
func isKeyNotFound(err error) bool {
e, ok := err.(*etcdErr.Error)
return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound
}

View File

@ -1,240 +0,0 @@
/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcdserver
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"strconv"
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store"
)
const (
raftPrefix = "/raft"
raftAttributesSuffix = "/raftAttributes"
attributesSuffix = "/attributes"
)
type ClusterStore interface {
Add(m Member)
Get() Cluster
Remove(id uint64)
IsRemoved(id uint64) bool
}
type clusterStore struct {
Store store.Store
// TODO: write the id into the actual store?
// TODO: save the id as string?
id uint64
clusterName string
}
// Add puts a new Member into the store.
// A Member with a matching id must not exist.
func (s *clusterStore) Add(m Member) {
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
log.Panicf("marshal error: %v", err)
}
if _, err := s.Store.Create(memberStoreKey(m.ID)+raftAttributesSuffix, false, string(b), false, store.Permanent); err != nil {
log.Panicf("add raftAttributes should never fail: %v", err)
}
b, err = json.Marshal(m.Attributes)
if err != nil {
log.Panicf("marshal error: %v", err)
}
if _, err := s.Store.Create(memberStoreKey(m.ID)+attributesSuffix, false, string(b), false, store.Permanent); err != nil {
log.Panicf("add attributes should never fail: %v", err)
}
}
// TODO(philips): keep the latest copy without going to the store to avoid the
// lock here.
func (s *clusterStore) Get() Cluster {
c := NewCluster(s.clusterName)
c.id = s.id
e, err := s.Store.Get(storeMembersPrefix, true, true)
if err != nil {
if isKeyNotFound(err) {
return *c
}
log.Panicf("get member should never fail: %v", err)
}
for _, n := range e.Node.Nodes {
m, err := nodeToMember(n)
if err != nil {
log.Panicf("unexpected nodeToMember error: %v", err)
}
if err := c.Add(m); err != nil {
log.Panicf("add member to cluster should never fail: %v", err)
}
}
return *c
}
// nodeToMember builds member through a store node.
// the child nodes of the given node should be sorted by key.
func nodeToMember(n *store.NodeExtern) (Member, error) {
m := Member{ID: parseMemberID(n.Key)}
if len(n.Nodes) != 2 {
return m, fmt.Errorf("len(nodes) = %d, want 2", len(n.Nodes))
}
if w := n.Key + attributesSuffix; n.Nodes[0].Key != w {
return m, fmt.Errorf("key = %v, want %v", n.Nodes[0].Key, w)
}
if err := json.Unmarshal([]byte(*n.Nodes[0].Value), &m.Attributes); err != nil {
return m, fmt.Errorf("unmarshal attributes error: %v", err)
}
if w := n.Key + raftAttributesSuffix; n.Nodes[1].Key != w {
return m, fmt.Errorf("key = %v, want %v", n.Nodes[1].Key, w)
}
if err := json.Unmarshal([]byte(*n.Nodes[1].Value), &m.RaftAttributes); err != nil {
return m, fmt.Errorf("unmarshal raftAttributes error: %v", err)
}
return m, nil
}
// Remove removes a member from the store.
// The given id MUST exist.
func (s *clusterStore) Remove(id uint64) {
if _, err := s.Store.Delete(memberStoreKey(id), true, true); err != nil {
log.Panicf("delete peer should never fail: %v", err)
}
if _, err := s.Store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
log.Panicf("creating RemovedMember should never fail: %v", err)
}
}
func (s *clusterStore) IsRemoved(id uint64) bool {
_, err := s.Store.Get(removedMemberStoreKey(id), false, false)
switch {
case err == nil:
return true
case isKeyNotFound(err):
return false
default:
log.Panicf("unexpected error when getting removed member %x: %v", id, err)
return false
}
}
// Sender creates the default production sender used to transport raft messages
// in the cluster. The returned sender will update the given ServerStats and
// LeaderStats appropriately.
func Sender(t *http.Transport, cls ClusterStore, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) {
c := &http.Client{Transport: t}
return func(msgs []raftpb.Message) {
for _, m := range msgs {
// TODO: reuse go routines
// limit the number of outgoing connections for the same receiver
go send(c, cls, m, ss, ls)
}
}
}
// send uses the given client to send a message to a member in the given
// ClusterStore, retrying up to 3 times for each message. The given
// ServerStats and LeaderStats are updated appropriately
func send(c *http.Client, cls ClusterStore, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) {
cid := cls.Get().ID()
// TODO (xiangli): reasonable retry logic
for i := 0; i < 3; i++ {
u := cls.Get().Pick(m.To)
if u == "" {
// TODO: unknown peer id.. what do we do? I
// don't think his should ever happen, need to
// look into this further.
log.Printf("etcdhttp: no addr for %d", m.To)
return
}
u = fmt.Sprintf("%s%s", u, raftPrefix)
// TODO: don't block. we should be able to have 1000s
// of messages out at a time.
data, err := m.Marshal()
if err != nil {
log.Println("etcdhttp: dropping message:", err)
return // drop bad message
}
if m.Type == raftpb.MsgApp {
ss.SendAppendReq(len(data))
}
to := idAsHex(m.To)
fs := ls.Follower(to)
start := time.Now()
sent := httpPost(c, u, cid, data)
end := time.Now()
if sent {
fs.Succ(end.Sub(start))
return
}
fs.Fail()
// TODO: backoff
}
}
// httpPost POSTs a data payload to a url using the given client. Returns true
// if the POST succeeds, false on any failure.
func httpPost(c *http.Client, url string, cid uint64, data []byte) bool {
req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
if err != nil {
// TODO: log the error?
return false
}
req.Header.Set("Content-Type", "application/protobuf")
req.Header.Set("X-Etcd-Cluster-ID", strconv.FormatUint(cid, 16))
resp, err := c.Do(req)
if err != nil {
// TODO: log the error?
return false
}
resp.Body.Close()
switch resp.StatusCode {
case http.StatusPreconditionFailed:
// TODO: shutdown the etcdserver gracefully?
log.Fatalf("etcd: conflicting cluster ID with the target cluster (%s != %s). Exiting.", resp.Header.Get("X-Etcd-Cluster-ID"), strconv.FormatUint(cid, 16))
return false
case http.StatusForbidden:
// TODO: stop the server
log.Fatalf("etcd: this member has been permanently removed from the cluster. Exiting.")
return false
case http.StatusNoContent:
return true
default:
return false
}
}
func isKeyNotFound(err error) bool {
e, ok := err.(*etcdErr.Error)
return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound
}

View File

@ -1,239 +0,0 @@
/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcdserver
import (
"path"
"reflect"
"testing"
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
)
func TestClusterStoreAdd(t *testing.T) {
st := &storeRecorder{}
ps := &clusterStore{Store: st}
ps.Add(newTestMember(1, nil, "node1", nil))
wactions := []action{
{
name: "Create",
params: []interface{}{
path.Join(storeMembersPrefix, "1", "raftAttributes"),
false,
`{"PeerURLs":null}`,
false,
store.Permanent,
},
},
{
name: "Create",
params: []interface{}{
path.Join(storeMembersPrefix, "1", "attributes"),
false,
`{"Name":"node1"}`,
false,
store.Permanent,
},
},
}
if g := st.Action(); !reflect.DeepEqual(g, wactions) {
t.Errorf("actions = %v, want %v", g, wactions)
}
}
func TestClusterStoreGet(t *testing.T) {
tests := []struct {
mems []Member
wmems []Member
}{
{
[]Member{newTestMember(1, nil, "node1", nil)},
[]Member{newTestMember(1, nil, "node1", nil)},
},
{
[]Member{},
[]Member{},
},
{
[]Member{
newTestMember(1, nil, "node1", nil),
newTestMember(2, nil, "node2", nil),
},
[]Member{
newTestMember(1, nil, "node1", nil),
newTestMember(2, nil, "node2", nil),
},
},
{
[]Member{
newTestMember(2, nil, "node2", nil),
newTestMember(1, nil, "node1", nil),
},
[]Member{
newTestMember(1, nil, "node1", nil),
newTestMember(2, nil, "node2", nil),
},
},
}
for i, tt := range tests {
c := NewCluster("")
if err := c.AddSlice(tt.mems); err != nil {
t.Fatal(err)
}
c.GenID(nil)
cs := &clusterStore{Store: newGetAllStore(), id: c.id}
for _, m := range tt.mems {
cs.Add(m)
}
if g := cs.Get(); !reflect.DeepEqual(&g, c) {
t.Errorf("#%d: mems = %v, want %v", i, &g, c)
}
}
}
func TestClusterStoreRemove(t *testing.T) {
st := &storeRecorder{}
cs := &clusterStore{Store: st}
cs.Remove(1)
wactions := []action{
{name: "Delete", params: []interface{}{memberStoreKey(1), true, true}},
{name: "Create", params: []interface{}{removedMemberStoreKey(1), false, "", false, store.Permanent}},
}
if !reflect.DeepEqual(st.Action(), wactions) {
t.Errorf("actions = %v, want %v", st.Action(), wactions)
}
}
func TestClusterStoreIsRemovedFalse(t *testing.T) {
st := &errStoreRecorder{err: etcdErr.NewError(etcdErr.EcodeKeyNotFound, "", 0)}
cs := clusterStore{Store: st}
if ok := cs.IsRemoved(1); ok != false {
t.Errorf("IsRemoved = %v, want %v", ok, false)
}
}
func TestClusterStoreIsRemovedTrue(t *testing.T) {
st := &storeRecorder{}
cs := &clusterStore{Store: st}
if ok := cs.IsRemoved(1); ok != true {
t.Errorf("IsRemoved = %v, want %v", ok, true)
}
wactions := []action{
{name: "Get", params: []interface{}{removedMemberStoreKey(1), false, false}},
}
if !reflect.DeepEqual(st.Action(), wactions) {
t.Errorf("actions = %v, want %v", st.Action(), wactions)
}
}
func TestNodeToMemberFail(t *testing.T) {
tests := []*store.NodeExtern{
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/strange"},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp("garbage")},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
{Key: "/1234/strange"},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
{Key: "/1234/static", Value: stringp("garbage")},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
{Key: "/1234/static", Value: stringp(`{"Name":"node1","ClientURLs":null}`)},
{Key: "/1234/strange"},
}},
}
for i, tt := range tests {
if _, err := nodeToMember(tt); err == nil {
t.Errorf("#%d: unexpected nil error", i)
}
}
}
func TestNodeToMember(t *testing.T) {
n := &store.NodeExtern{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/attributes", Value: stringp(`{"Name":"node1","ClientURLs":null}`)},
{Key: "/1234/raftAttributes", Value: stringp(`{"PeerURLs":null}`)},
}}
wm := Member{ID: 0x1234, RaftAttributes: RaftAttributes{}, Attributes: Attributes{Name: "node1"}}
m, err := nodeToMember(n)
if err != nil {
t.Fatalf("unexpected nodeToMember error: %v", err)
}
if !reflect.DeepEqual(m, wm) {
t.Errorf("member = %+v, want %+v", m, wm)
}
}
// simpleStore implements basic create and get.
type simpleStore struct {
storeRecorder
st map[string]string
}
func (s *simpleStore) Create(key string, _ bool, value string, _ bool, _ time.Time) (*store.Event, error) {
if s.st == nil {
s.st = make(map[string]string)
}
s.st[key] = value
return nil, nil
}
func (s *simpleStore) Get(key string, _, _ bool) (*store.Event, error) {
val, ok := s.st[key]
if !ok {
return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, "", 0)
}
ev := &store.Event{Node: &store.NodeExtern{Key: key, Value: stringp(val)}}
return ev, nil
}
// getAllStore embeds simpleStore, and makes Get return all keys sorted.
// It uses real store because it uses lots of logic in store and is not easy
// to mock.
// TODO: use mock one to do testing
type getAllStore struct {
store.Store
}
func newGetAllStore() *getAllStore {
return &getAllStore{store.New()}
}
func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member {
return Member{
ID: id,
RaftAttributes: RaftAttributes{PeerURLs: peerURLs},
Attributes: Attributes{Name: name, ClientURLs: clientURLs},
}
}
func newTestMemberp(id uint64, peerURLs []string, name string, clientURLs []string) *Member {
m := newTestMember(id, peerURLs, name, clientURLs)
return &m
}

View File

@ -17,156 +17,14 @@
package etcdserver
import (
"path"
"reflect"
"testing"
"github.com/coreos/etcd/store"
)
func TestClusterAddSlice(t *testing.T) {
tests := []struct {
mems []Member
want *Cluster
}{
{
[]Member{},
NewCluster(""),
},
{
[]Member{
newTestMember(1, []string{"foo", "bar"}, "", nil),
newTestMember(2, []string{"baz"}, "", nil),
},
&Cluster{
members: map[uint64]*Member{
1: newTestMemberp(1, []string{"foo", "bar"}, "", nil),
2: newTestMemberp(2, []string{"baz"}, "", nil),
},
},
},
}
for i, tt := range tests {
c := NewCluster("")
if err := c.AddSlice(tt.mems); err != nil {
t.Errorf("#%d: err=%#v, want nil", i, err)
continue
}
if !reflect.DeepEqual(c, tt.want) {
t.Errorf("#%d: c=%#v, want %#v", i, c, tt.want)
}
}
}
func TestClusterAddSliceBad(t *testing.T) {
c := Cluster{
members: map[uint64]*Member{
1: newTestMemberp(1, nil, "", nil),
},
}
if err := c.AddSlice([]Member{newTestMember(1, nil, "", nil)}); err == nil {
t.Error("want err, but got nil")
}
}
func TestClusterPick(t *testing.T) {
cs := Cluster{
members: map[uint64]*Member{
1: newTestMemberp(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil),
2: newTestMemberp(2, []string{"xyz"}, "", nil),
3: newTestMemberp(3, []string{}, "", nil),
},
}
ids := map[string]bool{
"abc": true,
"def": true,
"ghi": true,
"jkl": true,
"mno": true,
"pqr": true,
"stu": true,
}
for i := 0; i < 1000; i++ {
a := cs.Pick(1)
if !ids[a] {
t.Errorf("returned ID %q not in expected range!", a)
break
}
}
if b := cs.Pick(2); b != "xyz" {
t.Errorf("id=%q, want %q", b, "xyz")
}
if c := cs.Pick(3); c != "" {
t.Errorf("id=%q, want %q", c, "")
}
if d := cs.Pick(4); d != "" {
t.Errorf("id=%q, want %q", d, "")
}
}
func TestClusterFind(t *testing.T) {
tests := []struct {
id uint64
name string
mems []Member
match bool
}{
{
1,
"node1",
[]Member{newTestMember(1, nil, "node1", nil)},
true,
},
{
2,
"foobar",
[]Member{},
false,
},
{
2,
"node2",
[]Member{newTestMember(1, nil, "node1", nil), newTestMember(2, nil, "node2", nil)},
true,
},
{
3,
"node3",
[]Member{newTestMember(1, nil, "node1", nil), newTestMember(2, nil, "node2", nil)},
false,
},
}
for i, tt := range tests {
c := NewCluster("")
c.AddSlice(tt.mems)
m := c.FindID(tt.id)
if m == nil && !tt.match {
continue
}
if m == nil && tt.match {
t.Errorf("#%d: expected match got empty", i)
}
if m.Name != tt.name && tt.match {
t.Errorf("#%d: got = %v, want %v", i, m.Name, tt.name)
}
}
for i, tt := range tests {
c := NewCluster("")
c.AddSlice(tt.mems)
m := c.FindID(tt.id)
if m == nil && !tt.match {
continue
}
if m == nil && tt.match {
t.Errorf("#%d: expected match got empty", i)
}
if m.ID != tt.id && tt.match {
t.Errorf("#%d: got = %v, want %v", i, m.Name, tt.id)
}
}
}
func TestClusterSet(t *testing.T) {
func TestClusterFromString(t *testing.T) {
tests := []struct {
f string
mems []Member
@ -174,54 +32,28 @@ func TestClusterSet(t *testing.T) {
{
"mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379",
[]Member{
newTestMember(3736794188555456841, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil),
newTestMember(5674507346857578431, []string{"http://10.0.0.2:2379"}, "mem2", nil),
newTestMember(2676999861503984872, []string{"http://127.0.0.1:2379"}, "default", nil),
newTestMember(4322322643958477905, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil),
newTestMember(3141198903430435750, []string{"http://10.0.0.2:2379"}, "mem2", nil),
newTestMember(12762790032478827328, []string{"http://127.0.0.1:2379"}, "default", nil),
},
},
}
for i, tt := range tests {
c := NewCluster("")
if err := c.AddSlice(tt.mems); err != nil {
t.Error(err)
c, err := NewClusterFromString("abc", tt.f)
if err != nil {
t.Fatalf("#%d: unexpected new error: %v", i, err)
}
g := Cluster{}
g.SetMembersFromString(tt.f)
if g.String() != c.String() {
t.Errorf("#%d: set = %v, want %v", i, g, c)
if c.name != "abc" {
t.Errorf("#%d: name = %v, want abc", i, c.name)
}
wc := newTestCluster(tt.mems)
if !reflect.DeepEqual(c.members, wc.members) {
t.Errorf("#%d: members = %+v, want %+v", i, c.members, wc.members)
}
}
}
func TestClusterGenID(t *testing.T) {
cs := NewCluster("")
cs.AddSlice([]Member{
newTestMember(1, nil, "", nil),
newTestMember(2, nil, "", nil),
})
cs.GenID(nil)
if cs.ID() == 0 {
t.Fatalf("cluster.ID = %v, want not 0", cs.ID())
}
previd := cs.ID()
cs.Add(newTestMember(3, nil, "", nil))
cs.GenID(nil)
if cs.ID() == previd {
t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd)
}
previd = cs.ID()
cs.GenID([]byte("http://discovery.etcd.io/12345678"))
if cs.ID() == previd {
t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd)
}
}
func TestClusterSetBad(t *testing.T) {
func TestClusterFromStringBad(t *testing.T) {
tests := []string{
// invalid URL
"%^",
@ -230,41 +62,111 @@ func TestClusterSetBad(t *testing.T) {
"mem1,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
// TODO(philips): anyone know of a 64 bit sha1 hash collision
// "06b2f82fd81b2c20=http://128.193.4.20:2379,02c60cb75083ceef=http://128.193.4.20:2379",
// the same url for two members
"mem1=http://128.193.4.20:2379,mem2=http://128.193.4.20:2379",
}
for i, tt := range tests {
g := NewCluster("")
if err := g.SetMembersFromString(tt); err == nil {
t.Errorf("#%d: set = %v, want err", i, tt)
if _, err := NewClusterFromString("abc", tt); err == nil {
t.Errorf("#%d: unexpected successful new, want err", i)
}
}
}
func TestClusterFromStore(t *testing.T) {
tests := []struct {
mems []Member
}{
{
[]Member{newTestMember(1, nil, "node1", nil)},
},
{
[]Member{},
},
{
[]Member{
newTestMember(1, nil, "node1", nil),
newTestMember(2, nil, "node2", nil),
},
},
}
for i, tt := range tests {
st := store.New()
hc := newTestCluster(nil)
hc.SetStore(st)
for _, m := range tt.mems {
hc.AddMember(&m)
}
c := NewClusterFromStore("abc", st)
if c.name != "abc" {
t.Errorf("#%d: name = %v, want %v", i, c.name, "abc")
}
wc := newTestCluster(tt.mems)
if !reflect.DeepEqual(c.members, wc.members) {
t.Errorf("#%d: members = %v, want %v", i, c.members, wc.members)
}
}
}
func TestClusterMember(t *testing.T) {
membs := []Member{
newTestMember(1, nil, "node1", nil),
newTestMember(2, nil, "node2", nil),
}
tests := []struct {
id uint64
match bool
}{
{1, true},
{2, true},
{3, false},
}
for i, tt := range tests {
c := newTestCluster(membs)
m := c.Member(tt.id)
if g := m != nil; g != tt.match {
t.Errorf("#%d: find member = %v, want %v", i, g, tt.match)
}
if m != nil && m.ID != tt.id {
t.Errorf("#%d: id = %x, want %x", i, m.ID, tt.id)
}
}
}
func TestClusterMemberFromName(t *testing.T) {
membs := []Member{
newTestMember(1, nil, "node1", nil),
newTestMember(2, nil, "node2", nil),
}
tests := []struct {
name string
match bool
}{
{"node1", true},
{"node2", true},
{"node3", false},
}
for i, tt := range tests {
c := newTestCluster(membs)
m := c.MemberFromName(tt.name)
if g := m != nil; g != tt.match {
t.Errorf("#%d: find member = %v, want %v", i, g, tt.match)
}
if m != nil && m.Name != tt.name {
t.Errorf("#%d: name = %v, want %v", i, m.Name, tt.name)
}
}
}
func TestClusterMemberIDs(t *testing.T) {
cs := NewCluster("")
cs.AddSlice([]Member{
c := newTestCluster([]Member{
newTestMember(1, nil, "", nil),
newTestMember(4, nil, "", nil),
newTestMember(100, nil, "", nil),
})
w := []uint64{1, 4, 100}
g := cs.MemberIDs()
g := c.MemberIDs()
if !reflect.DeepEqual(w, g) {
t.Errorf("IDs=%+v, want %+v", g, w)
}
}
func TestClusterAddBad(t *testing.T) {
// Should not be possible to add the same ID multiple times
mems := []Member{
newTestMember(1, nil, "mem1", nil),
newTestMember(1, nil, "mem2", nil),
}
c := NewCluster("")
c.Add(newTestMember(1, nil, "mem1", nil))
for i, m := range mems {
if err := c.Add(m); err == nil {
t.Errorf("#%d: set = %v, want err", i, err)
}
t.Errorf("IDs = %+v, want %+v", g, w)
}
}
@ -315,11 +217,7 @@ func TestClusterPeerURLs(t *testing.T) {
}
for i, tt := range tests {
c := NewCluster("")
if err := c.AddSlice(tt.mems); err != nil {
t.Errorf("AddSlice error: %v", err)
continue
}
c := newTestCluster(tt.mems)
urls := c.PeerURLs()
if !reflect.DeepEqual(urls, tt.wurls) {
t.Errorf("#%d: PeerURLs = %v, want %v", i, urls, tt.wurls)
@ -374,14 +272,152 @@ func TestClusterClientURLs(t *testing.T) {
}
for i, tt := range tests {
c := NewCluster("")
if err := c.AddSlice(tt.mems); err != nil {
t.Errorf("AddSlice error: %v", err)
continue
}
c := newTestCluster(tt.mems)
urls := c.ClientURLs()
if !reflect.DeepEqual(urls, tt.wurls) {
t.Errorf("#%d: ClientURLs = %v, want %v", i, urls, tt.wurls)
}
}
}
func TestClusterGenID(t *testing.T) {
cs := newTestCluster([]Member{
newTestMember(1, nil, "", nil),
newTestMember(2, nil, "", nil),
})
cs.GenID(nil)
if cs.ID() == 0 {
t.Fatalf("cluster.ID = %v, want not 0", cs.ID())
}
previd := cs.ID()
cs.SetStore(&storeRecorder{})
cs.AddMember(newTestMemberp(3, nil, "", nil))
cs.GenID(nil)
if cs.ID() == previd {
t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd)
}
previd = cs.ID()
cs.GenID([]byte("http://discovery.etcd.io/12345678"))
if cs.ID() == previd {
t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd)
}
}
func TestNodeToMemberBad(t *testing.T) {
tests := []*store.NodeExtern{
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/strange"},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp("garbage")},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
{Key: "/1234/strange"},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
{Key: "/1234/static", Value: stringp("garbage")},
}},
{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
{Key: "/1234/static", Value: stringp(`{"Name":"node1","ClientURLs":null}`)},
{Key: "/1234/strange"},
}},
}
for i, tt := range tests {
if _, err := nodeToMember(tt); err == nil {
t.Errorf("#%d: unexpected nil error", i)
}
}
}
func TestClusterAddMember(t *testing.T) {
st := &storeRecorder{}
c := newTestCluster(nil)
c.SetStore(st)
c.AddMember(newTestMemberp(1, nil, "node1", nil))
wactions := []action{
{
name: "Create",
params: []interface{}{
path.Join(storeMembersPrefix, "1", "raftAttributes"),
false,
`{"PeerURLs":null}`,
false,
store.Permanent,
},
},
{
name: "Create",
params: []interface{}{
path.Join(storeMembersPrefix, "1", "attributes"),
false,
`{"Name":"node1"}`,
false,
store.Permanent,
},
},
}
if g := st.Action(); !reflect.DeepEqual(g, wactions) {
t.Errorf("actions = %v, want %v", g, wactions)
}
}
func TestClusterRemoveMember(t *testing.T) {
st := &storeRecorder{}
c := newTestCluster(nil)
c.SetStore(st)
c.RemoveMember(1)
wactions := []action{
{name: "Delete", params: []interface{}{memberStoreKey(1), true, true}},
{name: "Create", params: []interface{}{removedMemberStoreKey(1), false, "", false, store.Permanent}},
}
if !reflect.DeepEqual(st.Action(), wactions) {
t.Errorf("actions = %v, want %v", st.Action(), wactions)
}
}
func TestNodeToMember(t *testing.T) {
n := &store.NodeExtern{Key: "/1234", Nodes: []*store.NodeExtern{
{Key: "/1234/attributes", Value: stringp(`{"Name":"node1","ClientURLs":null}`)},
{Key: "/1234/raftAttributes", Value: stringp(`{"PeerURLs":null}`)},
}}
wm := &Member{ID: 0x1234, RaftAttributes: RaftAttributes{}, Attributes: Attributes{Name: "node1"}}
m, err := nodeToMember(n)
if err != nil {
t.Fatalf("unexpected nodeToMember error: %v", err)
}
if !reflect.DeepEqual(m, wm) {
t.Errorf("member = %+v, want %+v", m, wm)
}
}
func newTestCluster(membs []Member) *Cluster {
c := &Cluster{members: make(map[uint64]*Member), removed: make(map[uint64]bool)}
for i, m := range membs {
c.members[m.ID] = &membs[i]
}
return c
}
func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member {
return Member{
ID: id,
RaftAttributes: RaftAttributes{PeerURLs: peerURLs},
Attributes: Attributes{Name: name, ClientURLs: clientURLs},
}
}
func newTestMemberp(id uint64, peerURLs []string, name string, clientURLs []string) *Member {
m := newTestMember(id, peerURLs, name, clientURLs)
return &m
}

View File

@ -40,7 +40,7 @@ type ServerConfig struct {
// VerifyBootstrapConfig sanity-checks the initial config and returns an error
// for things that should never happen.
func (c *ServerConfig) VerifyBootstrapConfig() error {
m := c.Cluster.FindName(c.Name)
m := c.Cluster.MemberFromName(c.Name)
// Make sure the cluster at least contains the local server.
if m == nil {
return fmt.Errorf("couldn't find local name %s in the initial cluster configuration", c.Name)

View File

@ -44,8 +44,7 @@ func TestBootstrapConfigVerify(t *testing.T) {
}
for i, tt := range tests {
cluster := &Cluster{}
err := cluster.SetMembersFromString(tt.clusterSetting)
cluster, err := NewClusterFromString("", tt.clusterSetting)
if err != nil && tt.shouldError {
continue
}

View File

@ -59,12 +59,12 @@ var errClosed = errors.New("etcdhttp: client closed connection")
// NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
sh := &serverHandler{
server: server,
clusterStore: server.ClusterStore,
stats: server,
timer: server,
timeout: defaultServerTimeout,
clock: clockwork.NewRealClock(),
server: server,
clusterInfo: server.Cluster,
stats: server,
timer: server,
timeout: defaultServerTimeout,
clock: clockwork.NewRealClock(),
}
mux := http.NewServeMux()
mux.HandleFunc(keysPrefix, sh.serveKeys)
@ -84,10 +84,10 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
sh := &serverHandler{
server: server,
stats: server,
clusterStore: server.ClusterStore,
clock: clockwork.NewRealClock(),
server: server,
stats: server,
clusterInfo: server.Cluster,
clock: clockwork.NewRealClock(),
}
mux := http.NewServeMux()
mux.HandleFunc(raftPrefix, sh.serveRaft)
@ -97,12 +97,12 @@ func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
// serverHandler provides http.Handlers for etcd client and raft communication.
type serverHandler struct {
timeout time.Duration
server etcdserver.Server
stats etcdserver.Stats
timer etcdserver.RaftTimer
clusterStore etcdserver.ClusterStore
clock clockwork.Clock
timeout time.Duration
server etcdserver.Server
stats etcdserver.Stats
timer etcdserver.RaftTimer
clusterInfo etcdserver.ClusterInfo
clock clockwork.Clock
}
func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
@ -145,7 +145,7 @@ func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET", "HEAD") {
return
}
endpoints := h.clusterStore.Get().ClientURLs()
endpoints := h.clusterInfo.ClientURLs()
w.Write([]byte(strings.Join(endpoints, ", ")))
}
@ -267,7 +267,7 @@ func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
return
}
wcid := strconv.FormatUint(h.clusterStore.Get().ID(), 16)
wcid := strconv.FormatUint(h.clusterInfo.ID(), 16)
w.Header().Set("X-Etcd-Cluster-ID", wcid)
gcid := r.Header.Get("X-Etcd-Cluster-ID")

View File

@ -610,7 +610,7 @@ func TestV2DeprecatedMachinesEndpoint(t *testing.T) {
{"POST", http.StatusMethodNotAllowed},
}
m := NewClientHandler(&etcdserver.EtcdServer{ClusterStore: &fakeCluster{}})
m := NewClientHandler(&etcdserver.EtcdServer{Cluster: &etcdserver.Cluster{}})
s := httptest.NewServer(m)
defer s.Close()
@ -632,19 +632,14 @@ func TestV2DeprecatedMachinesEndpoint(t *testing.T) {
func TestServeMachines(t *testing.T) {
cluster := &fakeCluster{
members: []etcdserver.Member{
{ID: 0xBEEF0, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}},
{ID: 0xBEEF1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}},
{ID: 0xBEEF2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8082"}}},
},
clientURLs: []string{"http://localhost:8080", "http://localhost:8081", "http://localhost:8082"},
}
writer := httptest.NewRecorder()
req, err := http.NewRequest("GET", "", nil)
if err != nil {
t.Fatal(err)
}
h := &serverHandler{clusterStore: cluster}
h := &serverHandler{clusterInfo: cluster}
h.serveMachines(writer, req)
w := "http://localhost:8080, http://localhost:8081, http://localhost:8082"
if g := writer.Body.String(); g != w {
@ -981,9 +976,9 @@ func TestServeRaft(t *testing.T) {
}
req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
h := &serverHandler{
timeout: time.Hour,
server: &errServer{tt.serverErr},
clusterStore: &fakeCluster{},
timeout: time.Hour,
server: &errServer{tt.serverErr},
clusterInfo: &fakeCluster{id: 0},
}
rw := httptest.NewRecorder()
h.serveRaft(rw, req)
@ -1750,17 +1745,9 @@ func TestTrimNodeExternPrefix(t *testing.T) {
}
type fakeCluster struct {
members []etcdserver.Member
id uint64
clientURLs []string
}
func (c *fakeCluster) Add(m etcdserver.Member) { return }
func (c *fakeCluster) Get() etcdserver.Cluster {
cl := etcdserver.NewCluster("")
cl.AddSlice(c.members)
return *cl
}
func (c *fakeCluster) Remove(id uint64) { return }
func (c *fakeCluster) IsRemoved(id uint64) bool { return false }
func (c *fakeCluster) ID() uint64 { return c.id }
func (c *fakeCluster) ClientURLs() []string { return c.clientURLs }

View File

@ -21,6 +21,7 @@ import (
"encoding/binary"
"fmt"
"log"
"math/rand"
"path"
"sort"
"strconv"
@ -71,6 +72,15 @@ func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.T
return m
}
// Pick chooses a random address from a given Member's addresses, and returns it as
// an addressible URI. If the given member does not exist, an empty string is returned.
func (m *Member) Pick() string {
if len(m.PeerURLs) == 0 {
panic("member should always have some peer url")
}
return m.PeerURLs[rand.Intn(len(m.PeerURLs))]
}
func memberStoreKey(id uint64) string {
return path.Join(storeMembersPrefix, idAsHex(id))
}

View File

@ -53,3 +53,36 @@ func TestMemberTime(t *testing.T) {
}
}
}
func TestMemberPick(t *testing.T) {
tests := []struct {
memb *Member
urls map[string]bool
}{
{
newTestMemberp(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil),
map[string]bool{
"abc": true,
"def": true,
"ghi": true,
"jkl": true,
"mno": true,
"pqr": true,
"stu": true,
},
},
{
newTestMemberp(2, []string{"xyz"}, "", nil),
map[string]bool{"xyz": true},
},
}
for i, tt := range tests {
for j := 0; j < 1000; j++ {
a := tt.memb.Pick()
if !tt.urls[a] {
t.Errorf("#%d: returned ID %q not in expected range!", i, a)
break
}
}
}
}

121
etcdserver/sender.go Normal file
View File

@ -0,0 +1,121 @@
/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcdserver
import (
"bytes"
"fmt"
"log"
"net/http"
"strconv"
"time"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/raft/raftpb"
)
const raftPrefix = "/raft"
// Sender creates the default production sender used to transport raft messages
// in the cluster. The returned sender will update the given ServerStats and
// LeaderStats appropriately.
func Sender(t *http.Transport, cl *Cluster, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) {
c := &http.Client{Transport: t}
return func(msgs []raftpb.Message) {
for _, m := range msgs {
// TODO: reuse go routines
// limit the number of outgoing connections for the same receiver
go send(c, cl, m, ss, ls)
}
}
}
// send uses the given client to send a message to a member in the given
// ClusterStore, retrying up to 3 times for each message. The given
// ServerStats and LeaderStats are updated appropriately
func send(c *http.Client, cl *Cluster, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) {
cid := cl.ID()
// TODO (xiangli): reasonable retry logic
for i := 0; i < 3; i++ {
memb := cl.Member(m.To)
if memb == nil {
// TODO: unknown peer id.. what do we do? I
// don't think his should ever happen, need to
// look into this further.
log.Printf("etcdhttp: no member for %d", m.To)
return
}
u := fmt.Sprintf("%s%s", memb.Pick(), raftPrefix)
// TODO: don't block. we should be able to have 1000s
// of messages out at a time.
data, err := m.Marshal()
if err != nil {
log.Println("etcdhttp: dropping message:", err)
return // drop bad message
}
if m.Type == raftpb.MsgApp {
ss.SendAppendReq(len(data))
}
to := idAsHex(m.To)
fs := ls.Follower(to)
start := time.Now()
sent := httpPost(c, u, cid, data)
end := time.Now()
if sent {
fs.Succ(end.Sub(start))
return
}
fs.Fail()
// TODO: backoff
}
}
// httpPost POSTs a data payload to a url using the given client. Returns true
// if the POST succeeds, false on any failure.
func httpPost(c *http.Client, url string, cid uint64, data []byte) bool {
req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
if err != nil {
// TODO: log the error?
return false
}
req.Header.Set("Content-Type", "application/protobuf")
req.Header.Set("X-Etcd-Cluster-ID", strconv.FormatUint(cid, 16))
resp, err := c.Do(req)
if err != nil {
// TODO: log the error?
return false
}
resp.Body.Close()
switch resp.StatusCode {
case http.StatusPreconditionFailed:
// TODO: shutdown the etcdserver gracefully?
log.Panicf("clusterID mismatch")
return false
case http.StatusForbidden:
// TODO: stop the server
log.Panicf("the member has been removed")
return false
case http.StatusNoContent:
return true
default:
return false
}
}

View File

@ -137,10 +137,9 @@ type EtcdServer struct {
done chan struct{}
stopped chan struct{}
id uint64
clusterID uint64
attributes Attributes
ClusterStore ClusterStore
Cluster *Cluster
node raft.Node
store store.Store
@ -176,12 +175,12 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
st := store.New()
var w *wal.WAL
var n raft.Node
var id, cid uint64
var id uint64
if !wal.Exist(cfg.WALDir()) {
if err := cfg.VerifyBootstrapConfig(); err != nil {
log.Fatalf("etcdserver: %v", err)
}
m := cfg.Cluster.FindName(cfg.Name)
m := cfg.Cluster.MemberFromName(cfg.Name)
if cfg.ShouldDiscover() {
d, err := discovery.New(cfg.DiscoveryURL, m.ID, cfg.Cluster.String())
if err != nil {
@ -191,11 +190,12 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
if err != nil {
log.Fatalf("etcdserver: %v", err)
}
if err = cfg.Cluster.SetMembersFromString(s); err != nil {
if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.name, s); err != nil {
log.Fatalf("etcdserver: %v", err)
}
}
id, cid, n, w = startNode(cfg)
cfg.Cluster.SetStore(st)
id, n, w = startNode(cfg)
} else {
if cfg.ShouldDiscover() {
log.Printf("etcdserver: warn: ignoring discovery: etcd has already been initialized and has a valid log in %q", cfg.WALDir())
@ -210,11 +210,10 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
st.Recovery(snapshot.Data)
index = snapshot.Index
}
id, cid, n, w = restartNode(cfg, index, snapshot)
cfg.Cluster = NewClusterFromStore(cfg.Cluster.name, st)
id, n, w = restartNode(cfg, index, snapshot)
}
cls := &clusterStore{Store: st, id: cid}
sstats := &stats.ServerStats{
Name: cfg.Name,
ID: idAsHex(id),
@ -225,19 +224,18 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
store: st,
node: n,
id: id,
clusterID: cid,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Cluster: cfg.Cluster,
storage: struct {
*wal.WAL
*snap.Snapshotter
}{w, ss},
stats: sstats,
lstats: lstats,
send: Sender(cfg.Transport, cls, sstats, lstats),
Ticker: time.Tick(100 * time.Millisecond),
SyncTicker: time.Tick(500 * time.Millisecond),
snapCount: cfg.SnapCount,
ClusterStore: cls,
stats: sstats,
lstats: lstats,
send: Sender(cfg.Transport, cfg.Cluster, sstats, lstats),
Ticker: time.Tick(100 * time.Millisecond),
SyncTicker: time.Tick(500 * time.Millisecond),
snapCount: cfg.SnapCount,
}
return s
}
@ -268,7 +266,7 @@ func (s *EtcdServer) start() {
}
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.ClusterStore.IsRemoved(m.From) {
if s.Cluster.IsMemberRemoved(m.From) {
return ErrRemoved
}
return s.node.Step(ctx, m)
@ -599,22 +597,22 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error
s.node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
var m Member
if err := json.Unmarshal(cc.Context, &m); err != nil {
m := new(Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic("unexpected unmarshal error")
}
if cc.NodeID != m.ID {
panic("unexpected nodeID mismatch")
}
s.ClusterStore.Add(m)
s.Cluster.AddMember(m)
case raftpb.ConfChangeRemoveNode:
s.ClusterStore.Remove(cc.NodeID)
s.Cluster.RemoveMember(cc.NodeID)
}
return nil
}
func (s *EtcdServer) checkConfChange(cc raftpb.ConfChange, nodes []uint64) error {
if s.ClusterStore.IsRemoved(cc.NodeID) {
if s.Cluster.IsMemberRemoved(cc.NodeID) {
return ErrIDRemoved
}
switch cc.Type {
@ -644,11 +642,11 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
s.storage.Cut()
}
func startNode(cfg *ServerConfig) (id, cid uint64, n raft.Node, w *wal.WAL) {
func startNode(cfg *ServerConfig) (id uint64, n raft.Node, w *wal.WAL) {
var err error
// TODO: remove the discoveryURL when it becomes part of the source for
// generating nodeID.
member := cfg.Cluster.FindName(cfg.Name)
member := cfg.Cluster.MemberFromName(cfg.Name)
cfg.Cluster.GenID([]byte(cfg.DiscoveryURL))
metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: member.ID, ClusterID: cfg.Cluster.ID()})
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
@ -657,19 +655,19 @@ func startNode(cfg *ServerConfig) (id, cid uint64, n raft.Node, w *wal.WAL) {
ids := cfg.Cluster.MemberIDs()
peers := make([]raft.Peer, len(ids))
for i, id := range ids {
ctx, err := json.Marshal((*cfg.Cluster).FindID(id))
ctx, err := json.Marshal((*cfg.Cluster).Member(id))
if err != nil {
log.Fatal(err)
}
peers[i] = raft.Peer{ID: id, Context: ctx}
}
id, cid = member.ID, cfg.Cluster.ID()
log.Printf("etcdserver: start node %d in cluster %d", id, cid)
n = raft.StartNode(member.ID, peers, 10, 1)
id = member.ID
log.Printf("etcdserver: start node %x in cluster %x", id, cfg.Cluster.ID())
n = raft.StartNode(id, peers, 10, 1)
return
}
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id, cid uint64, n raft.Node, w *wal.WAL) {
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id uint64, n raft.Node, w *wal.WAL) {
var err error
// restart a node from previous wal
if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil {
@ -682,8 +680,9 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id
var metadata pb.Metadata
pbutil.MustUnmarshal(&metadata, wmetadata)
id, cid = metadata.NodeID, metadata.ClusterID
log.Printf("etcdserver: restart member %x in cluster %x at commit index %d", id, cid, st.Commit)
id = metadata.NodeID
cfg.Cluster.SetID(metadata.ClusterID)
log.Printf("etcdserver: restart member %x in cluster %x at commit index %d", id, cfg.Cluster.ID(), st.Commit)
n = raft.RestartNode(id, 10, 1, snapshot, st, ents)
return
}

View File

@ -423,10 +423,10 @@ func TestApplyConfChangeError(t *testing.T) {
}
for i, tt := range tests {
n := &nodeRecorder{}
cs := &removedClusterStore{removed: removed}
cl := &Cluster{removed: removed}
srv := &EtcdServer{
node: n,
ClusterStore: cs,
node: n,
Cluster: cl,
}
err := srv.applyConfChange(tt.cc, nodes)
if err != tt.werr {
@ -471,13 +471,15 @@ func testServer(t *testing.T, ns uint64) {
n := raft.StartNode(id, members, 10, 1)
tk := time.NewTicker(10 * time.Millisecond)
defer tk.Stop()
cl := newCluster("abc")
cl.SetStore(&storeRecorder{})
srv := &EtcdServer{
node: n,
store: store.New(),
send: send,
storage: &storageRecorder{},
Ticker: tk.C,
ClusterStore: &clusterStoreRecorder{},
node: n,
store: store.New(),
send: send,
storage: &storageRecorder{},
Ticker: tk.C,
Cluster: cl,
}
srv.start()
ss[i] = srv
@ -538,13 +540,15 @@ func TestDoProposal(t *testing.T) {
tk := make(chan time.Time)
// this makes <-tk always successful, which accelerates internal clock
close(tk)
cl := newCluster("abc")
cl.SetStore(&storeRecorder{})
srv := &EtcdServer{
node: n,
store: st,
send: func(_ []raftpb.Message) {},
storage: &storageRecorder{},
Ticker: tk,
ClusterStore: &clusterStoreRecorder{},
node: n,
store: st,
send: func(_ []raftpb.Message) {},
storage: &storageRecorder{},
Ticker: tk,
Cluster: cl,
}
srv.start()
resp, err := srv.Do(ctx, tt)
@ -782,12 +786,12 @@ func TestTriggerSnap(t *testing.T) {
st := &storeRecorder{}
p := &storageRecorder{}
s := &EtcdServer{
store: st,
send: func(_ []raftpb.Message) {},
storage: p,
node: n,
snapCount: 10,
ClusterStore: &clusterStoreRecorder{},
store: st,
send: func(_ []raftpb.Message) {},
storage: p,
node: n,
snapCount: 10,
Cluster: &Cluster{},
}
s.start()
@ -872,19 +876,20 @@ func TestAddMember(t *testing.T) {
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{
RaftState: raft.StateLeader,
Nodes: []uint64{2, 3},
Nodes: []uint64{2345, 3456},
},
}
cs := &clusterStoreRecorder{}
cl := newTestCluster(nil)
cl.SetStore(&storeRecorder{})
s := &EtcdServer{
node: n,
store: &storeRecorder{},
send: func(_ []raftpb.Message) {},
storage: &storageRecorder{},
ClusterStore: cs,
node: n,
store: &storeRecorder{},
send: func(_ []raftpb.Message) {},
storage: &storageRecorder{},
Cluster: cl,
}
s.start()
m := Member{ID: 1, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
err := s.AddMember(context.TODO(), m)
gaction := n.Action()
s.Stop()
@ -896,9 +901,8 @@ func TestAddMember(t *testing.T) {
if !reflect.DeepEqual(gaction, wactions) {
t.Errorf("action = %v, want %v", gaction, wactions)
}
wcsactions := []action{{name: "Add", params: []interface{}{m}}}
if g := cs.Action(); !reflect.DeepEqual(g, wcsactions) {
t.Errorf("csaction = %v, want %v", g, wcsactions)
if cl.Member(1234) == nil {
t.Errorf("member with id 1234 is not added")
}
}
@ -908,20 +912,20 @@ func TestRemoveMember(t *testing.T) {
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{
RaftState: raft.StateLeader,
Nodes: []uint64{1, 2, 3},
Nodes: []uint64{1234, 2345, 3456},
},
}
cs := &clusterStoreRecorder{}
cl := newTestCluster([]Member{{ID: 1234}})
cl.SetStore(&storeRecorder{})
s := &EtcdServer{
node: n,
store: &storeRecorder{},
send: func(_ []raftpb.Message) {},
storage: &storageRecorder{},
ClusterStore: cs,
node: n,
store: &storeRecorder{},
send: func(_ []raftpb.Message) {},
storage: &storageRecorder{},
Cluster: cl,
}
s.start()
id := uint64(1)
err := s.RemoveMember(context.TODO(), id)
err := s.RemoveMember(context.TODO(), 1234)
gaction := n.Action()
s.Stop()
@ -932,9 +936,8 @@ func TestRemoveMember(t *testing.T) {
if !reflect.DeepEqual(gaction, wactions) {
t.Errorf("action = %v, want %v", gaction, wactions)
}
wcsactions := []action{{name: "Remove", params: []interface{}{id}}}
if g := cs.Action(); !reflect.DeepEqual(g, wcsactions) {
t.Errorf("csaction = %v, want %v", g, wcsactions)
if cl.Member(1234) != nil {
t.Errorf("member with id 1234 is not removed")
}
}

View File

@ -85,7 +85,7 @@ func (c *cluster) Launch(t *testing.T) {
}
lns := make([]net.Listener, c.Size)
clusterCfg := etcdserver.NewCluster(clusterName)
infos := make([]etcdserver.MemberInfo, c.Size)
for i := 0; i < c.Size; i++ {
l := newLocalListener(t)
// each member claims only one peer listener
@ -94,9 +94,7 @@ func (c *cluster) Launch(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, err = clusterCfg.AddMemberFromURLs(c.name(i), listenURLs); err != nil {
t.Fatal(err)
}
infos[i] = etcdserver.MemberInfo{Name: c.name(i), PeerURLs: listenURLs}
}
var err error
@ -114,7 +112,10 @@ func (c *cluster) Launch(t *testing.T) {
if err != nil {
t.Fatal(err)
}
m.Cluster = clusterCfg
m.Cluster, err = etcdserver.NewClusterFromMemberInfos(clusterName, infos)
if err != nil {
t.Fatal(err)
}
m.ClusterState = etcdserver.ClusterStateValueNew
m.Transport, err = transport.NewTransport(transport.TLSInfo{})
if err != nil {

11
main.go
View File

@ -262,7 +262,6 @@ func startProxy() {
// setupCluster sets up the cluster definition for bootstrap or discovery.
func setupCluster() error {
cluster = etcdserver.NewCluster(*initialClusterName)
set := make(map[string]bool)
fs.Visit(func(f *flag.Flag) {
set[f.Name] = true
@ -275,17 +274,17 @@ func setupCluster() error {
return err
}
err = nil
switch {
case set["discovery"]:
cluster = etcdserver.NewCluster(*durl)
_, err := cluster.AddMemberFromURLs(*name, apurls)
return err
infos := []etcdserver.MemberInfo{{Name: *name, PeerURLs: apurls}}
cluster, err = etcdserver.NewClusterFromMemberInfos(*durl, infos)
case set["initial-cluster"]:
fallthrough
default:
// We're statically configured, and cluster has appropriately been set.
// Try to configure by indexing the static cluster by name.
cluster.SetMembersFromString(*initialCluster)
cluster, err = etcdserver.NewClusterFromString(*initialClusterName, *initialCluster)
}
return nil
return err
}