*: generate clusterid

This commit is contained in:
Xiang Li 2014-10-11 19:20:14 +08:00
parent f12583c163
commit ea6bcacfe4
12 changed files with 140 additions and 62 deletions

View File

@ -1,6 +1,8 @@
package etcdserver
import (
"crypto/sha1"
"encoding/binary"
"fmt"
"math/rand"
"net/url"
@ -12,14 +14,21 @@ import (
)
// Cluster is a list of Members that belong to the same raft cluster
type Cluster map[uint64]*Member
type Cluster struct {
id uint64
members map[uint64]*Member
}
func NewCluster() *Cluster {
return &Cluster{members: make(map[uint64]*Member)}
}
func (c Cluster) FindID(id uint64) *Member {
return c[id]
return c.members[id]
}
func (c Cluster) FindName(name string) *Member {
for _, m := range c {
for _, m := range c.members {
if m.Name == name {
return m
}
@ -32,7 +41,7 @@ func (c Cluster) Add(m Member) error {
if c.FindID(m.ID) != nil {
return fmt.Errorf("Member exists with identical ID %v", m)
}
c[m.ID] = &m
c.members[m.ID] = &m
return nil
}
@ -64,7 +73,7 @@ func (c Cluster) Pick(id uint64) string {
// Set parses command line sets of names to IPs formatted like:
// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach0=http://1.1.1.1,mach1=http://2.2.2.2,mach1=http://3.3.3.3
func (c *Cluster) Set(s string) error {
*c = Cluster{}
*c = *NewCluster()
v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1))
if err != nil {
return err
@ -84,9 +93,19 @@ func (c *Cluster) Set(s string) error {
return nil
}
func (c *Cluster) GenID(salt []byte) {
mIDs := c.MemberIDs()
b := make([]byte, 8*len(mIDs))
for i, id := range mIDs {
binary.BigEndian.PutUint64(b[8*i:], id)
}
hash := sha1.Sum(append(b, salt...))
c.id = binary.BigEndian.Uint64(hash[:8])
}
func (c Cluster) String() string {
sl := []string{}
for _, m := range c {
for _, m := range c.members {
for _, u := range m.PeerURLs {
sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u))
}
@ -95,9 +114,13 @@ func (c Cluster) String() string {
return strings.Join(sl, ",")
}
func (c Cluster) IDs() []uint64 {
func (c Cluster) ID() uint64 { return c.id }
func (c Cluster) Members() map[uint64]*Member { return c.members }
func (c Cluster) MemberIDs() []uint64 {
var ids []uint64
for _, m := range c {
for _, m := range c.members {
ids = append(ids, m.ID)
}
sort.Sort(types.Uint64Slice(ids))
@ -109,7 +132,7 @@ func (c Cluster) IDs() []uint64 {
// ascending lexicographical order.
func (c Cluster) PeerURLs() []string {
endpoints := make([]string, 0)
for _, p := range c {
for _, p := range c.members {
for _, addr := range p.PeerURLs {
endpoints = append(endpoints, addr)
}
@ -123,7 +146,7 @@ func (c Cluster) PeerURLs() []string {
// ascending lexicographical order.
func (c Cluster) ClientURLs() []string {
urls := make([]string, 0)
for _, p := range c {
for _, p := range c.members {
for _, url := range p.ClientURLs {
urls = append(urls, url)
}

View File

@ -55,7 +55,7 @@ func (s *clusterStore) Add(m Member) {
// TODO(philips): keep the latest copy without going to the store to avoid the
// lock here.
func (s *clusterStore) Get() Cluster {
c := &Cluster{}
c := NewCluster()
e, err := s.Store.Get(membersKVPrefix, true, true)
if err != nil {
if v, ok := err.(*etcdErr.Error); ok && v.ErrorCode == etcdErr.EcodeKeyNotFound {

View File

@ -80,12 +80,12 @@ func TestClusterStoreGet(t *testing.T) {
for _, m := range tt.mems {
cs.Add(m)
}
c := Cluster{}
c := NewCluster()
if err := c.AddSlice(tt.mems); err != nil {
t.Fatal(err)
}
if g := cs.Get(); !reflect.DeepEqual(g, c) {
t.Errorf("#%d: mems = %v, want %v", i, g, c)
if g := cs.Get(); !reflect.DeepEqual(&g, c) {
t.Errorf("#%d: mems = %v, want %v", i, &g, c)
}
}
}

View File

@ -8,28 +8,27 @@ import (
func TestClusterAddSlice(t *testing.T) {
tests := []struct {
mems []Member
want *Cluster
}{
{
[]Member{},
&Cluster{},
NewCluster(),
},
{
[]Member{
newTestMember(1, []string{"foo", "bar"}, "", nil),
newTestMember(2, []string{"baz"}, "", nil),
},
&Cluster{
1: newTestMemberp(1, []string{"foo", "bar"}, "", nil),
2: newTestMemberp(2, []string{"baz"}, "", nil),
members: map[uint64]*Member{
1: newTestMemberp(1, []string{"foo", "bar"}, "", nil),
2: newTestMemberp(2, []string{"baz"}, "", nil),
},
},
},
}
for i, tt := range tests {
c := &Cluster{}
c := NewCluster()
if err := c.AddSlice(tt.mems); err != nil {
t.Errorf("#%d: err=%#v, want nil", i, err)
continue
@ -42,7 +41,9 @@ func TestClusterAddSlice(t *testing.T) {
func TestClusterAddSliceBad(t *testing.T) {
c := Cluster{
1: newTestMemberp(1, nil, "", nil),
members: map[uint64]*Member{
1: newTestMemberp(1, nil, "", nil),
},
}
if err := c.AddSlice([]Member{newTestMember(1, nil, "", nil)}); err == nil {
t.Error("want err, but got nil")
@ -51,9 +52,11 @@ func TestClusterAddSliceBad(t *testing.T) {
func TestClusterPick(t *testing.T) {
cs := Cluster{
1: newTestMemberp(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil),
2: newTestMemberp(2, []string{"xyz"}, "", nil),
3: newTestMemberp(3, []string{}, "", nil),
members: map[uint64]*Member{
1: newTestMemberp(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil),
2: newTestMemberp(2, []string{"xyz"}, "", nil),
3: newTestMemberp(3, []string{}, "", nil),
},
}
ids := map[string]bool{
"abc": true,
@ -115,7 +118,7 @@ func TestClusterFind(t *testing.T) {
},
}
for i, tt := range tests {
c := Cluster{}
c := NewCluster()
c.AddSlice(tt.mems)
m := c.FindName(tt.name)
@ -131,7 +134,7 @@ func TestClusterFind(t *testing.T) {
}
for i, tt := range tests {
c := Cluster{}
c := NewCluster()
c.AddSlice(tt.mems)
m := c.FindID(tt.id)
@ -162,7 +165,7 @@ func TestClusterSet(t *testing.T) {
},
}
for i, tt := range tests {
c := Cluster{}
c := NewCluster()
if err := c.AddSlice(tt.mems); err != nil {
t.Error(err)
}
@ -176,6 +179,32 @@ func TestClusterSet(t *testing.T) {
}
}
func TestClusterGenID(t *testing.T) {
cs := NewCluster()
cs.AddSlice([]Member{
newTestMember(1, nil, "", nil),
newTestMember(2, nil, "", nil),
})
cs.GenID(nil)
if cs.ID() == 0 {
t.Fatalf("cluster.ID = %v, want not 0", cs.ID())
}
previd := cs.ID()
cs.Add(newTestMember(3, nil, "", nil))
cs.GenID(nil)
if cs.ID() == previd {
t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd)
}
previd = cs.ID()
cs.GenID([]byte("http://discovery.etcd.io/12345678"))
if cs.ID() == previd {
t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd)
}
}
func TestClusterSetBad(t *testing.T) {
tests := []string{
// invalid URL
@ -187,22 +216,22 @@ func TestClusterSetBad(t *testing.T) {
// "06b2f82fd81b2c20=http://128.193.4.20:2379,02c60cb75083ceef=http://128.193.4.20:2379",
}
for i, tt := range tests {
g := Cluster{}
g := NewCluster()
if err := g.Set(tt); err == nil {
t.Errorf("#%d: set = %v, want err", i, tt)
}
}
}
func TestClusterIDs(t *testing.T) {
cs := Cluster{}
func TestClusterMemberIDs(t *testing.T) {
cs := NewCluster()
cs.AddSlice([]Member{
newTestMember(1, nil, "", nil),
newTestMember(4, nil, "", nil),
newTestMember(100, nil, "", nil),
})
w := []uint64{1, 4, 100}
g := cs.IDs()
g := cs.MemberIDs()
if !reflect.DeepEqual(w, g) {
t.Errorf("IDs=%+v, want %+v", g, w)
}
@ -214,7 +243,7 @@ func TestClusterAddBad(t *testing.T) {
newTestMember(1, nil, "mem1", nil),
newTestMember(1, nil, "mem2", nil),
}
c := &Cluster{}
c := NewCluster()
c.Add(newTestMember(1, nil, "mem1", nil))
for i, m := range mems {
if err := c.Add(m); err == nil {
@ -270,7 +299,7 @@ func TestClusterPeerURLs(t *testing.T) {
}
for i, tt := range tests {
c := Cluster{}
c := NewCluster()
if err := c.AddSlice(tt.mems); err != nil {
t.Errorf("AddSlice error: %v", err)
continue
@ -329,7 +358,7 @@ func TestClusterClientURLs(t *testing.T) {
}
for i, tt := range tests {
c := Cluster{}
c := NewCluster()
if err := c.AddSlice(tt.mems); err != nil {
t.Errorf("AddSlice error: %v", err)
continue

View File

@ -31,7 +31,7 @@ func (c *ServerConfig) Verify() error {
// No identical IPs in the cluster peer list
urlMap := make(map[string]bool)
for _, m := range *c.Cluster {
for _, m := range c.Cluster.Members() {
for _, url := range m.PeerURLs {
if urlMap[url] {
return fmt.Errorf("duplicate url %v in server config", url)

View File

@ -1529,7 +1529,7 @@ type fakeCluster struct {
func (c *fakeCluster) Add(m etcdserver.Member) { return }
func (c *fakeCluster) Get() etcdserver.Cluster {
cl := &etcdserver.Cluster{}
cl := etcdserver.NewCluster()
cl.AddSlice(c.members)
return *cl
}

View File

@ -54,6 +54,7 @@ func (*Request) ProtoMessage() {}
type Metadata struct {
NodeID uint64 `protobuf:"varint,1,req" json:"NodeID"`
ClusterID uint64 `protobuf:"varint,2,req" json:"ClusterID"`
XXX_unrecognized []byte `json:"-"`
}
@ -422,6 +423,21 @@ func (m *Metadata) Unmarshal(data []byte) error {
break
}
}
case 2:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.ClusterID |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
var sizeOfWire int
for {
@ -479,6 +495,7 @@ func (m *Metadata) Size() (n int) {
var l int
_ = l
n += 1 + sovEtcdserver(uint64(m.NodeID))
n += 1 + sovEtcdserver(uint64(m.ClusterID))
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@ -627,6 +644,9 @@ func (m *Metadata) MarshalTo(data []byte) (n int, err error) {
data[i] = 0x8
i++
i = encodeVarintEtcdserver(data, i, uint64(m.NodeID))
data[i] = 0x10
i++
i = encodeVarintEtcdserver(data, i, uint64(m.ClusterID))
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}

View File

@ -27,5 +27,6 @@ message Request {
}
message Metadata {
required uint64 NodeID = 1 [(gogoproto.nullable) = false];
required uint64 NodeID = 1 [(gogoproto.nullable) = false];
required uint64 ClusterID = 2 [(gogoproto.nullable) = false];
}

View File

@ -112,6 +112,7 @@ type EtcdServer struct {
w wait.Wait
done chan struct{}
id uint64
clusterID uint64
attributes Attributes
ClusterStore ClusterStore
@ -153,6 +154,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
st := store.New()
var w *wal.WAL
var n raft.Node
var id, cid uint64
if !wal.Exist(cfg.WALDir()) {
if !cfg.IsBootstrap() {
log.Fatalf("etcdserver: initial cluster state unset and no wal or discovery URL found")
@ -170,7 +172,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
log.Fatalf("etcdserver: %v", err)
}
}
n, w = startNode(cfg)
id, cid, n, w = startNode(cfg)
} else {
if cfg.ShouldDiscover() {
log.Printf("etcdserver: warn: ignoring discovery: etcd has already been initialized and has a valid log in %q", cfg.WALDir())
@ -185,7 +187,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
st.Recovery(snapshot.Data)
index = snapshot.Index
}
n, w = restartNode(cfg, index, snapshot)
id, cid, n, w = restartNode(cfg, index, snapshot)
}
cls := &clusterStore{Store: st}
@ -199,7 +201,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
s := &EtcdServer{
store: st,
node: n,
id: cfg.ID(),
id: id,
clusterID: cid,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
storage: struct {
*wal.WAL
@ -617,27 +620,29 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
s.storage.Cut()
}
func startNode(cfg *ServerConfig) (n raft.Node, w *wal.WAL) {
func startNode(cfg *ServerConfig) (id, cid uint64, n raft.Node, w *wal.WAL) {
var err error
metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: cfg.ID()})
cfg.Cluster.GenID([]byte(cfg.DiscoveryURL))
metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: cfg.ID(), ClusterID: cfg.Cluster.ID()})
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
log.Fatal(err)
}
ids := cfg.Cluster.IDs()
ids := cfg.Cluster.MemberIDs()
peers := make([]raft.Peer, len(ids))
for i, id := range ids {
ctx, err := json.Marshal((*cfg.Cluster)[id])
ctx, err := json.Marshal((*cfg.Cluster).FindID(id))
if err != nil {
log.Fatal(err)
}
peers[i] = raft.Peer{ID: id, Context: ctx}
}
log.Printf("etcdserver: start node %d", cfg.ID())
id, cid = cfg.ID(), cfg.Cluster.ID()
log.Printf("etcdserver: start node %d in cluster %d", cfg.ID(), cfg.Cluster.ID())
n = raft.StartNode(cfg.ID(), peers, 10, 1)
return
}
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (n raft.Node, w *wal.WAL) {
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id, cid uint64, n raft.Node, w *wal.WAL) {
var err error
// restart a node from previous wal
if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil {
@ -650,8 +655,10 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (n
var metadata pb.Metadata
pbutil.MustUnmarshal(&metadata, wmetadata)
log.Printf("etcdserver: restart node %d at commit index %d", metadata.NodeID, st.Commit)
n = raft.RestartNode(metadata.NodeID, 10, 1, snapshot, st, ents)
id, cid = metadata.NodeID, metadata.ClusterID
log.Printf("etcdserver: restart node %d in cluster %d at commit index %d",
metadata.NodeID, metadata.ClusterID, st.Commit)
n = raft.RestartNode(id, 10, 1, snapshot, st, ents)
return
}

View File

@ -446,7 +446,6 @@ func testServer(t *testing.T, ns uint64) {
ids[i] = i + 1
}
members := mustMakePeerSlice(t, ids...)
for i := uint64(0); i < ns; i++ {
id := i + 1
n := raft.StartNode(id, members, 10, 1)
@ -781,11 +780,12 @@ func TestTriggerSnap(t *testing.T) {
gaction := p.Action()
// each operation is recorded as a Save
// BootstrapConfig/Nop + (SnapCount - 1) * Puts + Cut + SaveSnap = Save + (SnapCount - 1) * Save + Cut + SaveSnap
if len(gaction) != 2+int(s.snapCount) {
t.Fatalf("len(action) = %d, want %d", len(gaction), 2+int(s.snapCount))
wcnt := 2 + int(s.snapCount)
if len(gaction) != wcnt {
t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
}
if !reflect.DeepEqual(gaction[11], action{name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[11])
if !reflect.DeepEqual(gaction[wcnt-1], action{name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
}
}
@ -1321,7 +1321,7 @@ func (cs *clusterStoreRecorder) Add(m Member) {
}
func (cs *clusterStoreRecorder) Get() Cluster {
cs.record(action{name: "Get"})
return nil
return Cluster{}
}
func (cs *clusterStoreRecorder) Remove(id uint64) {
cs.record(action{name: "Remove", params: []interface{}{id}})

View File

@ -130,17 +130,16 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int) Node {
n := newNode()
r := newRaft(id, nil, election, heartbeat)
ents := make([]pb.Entry, len(peers))
for i, peer := range peers {
for _, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
data, err := cc.Marshal()
d, err := cc.Marshal()
if err != nil {
panic("unexpected marshal error")
}
ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
r.raftLog.append(r.raftLog.lastIndex(), e)
}
r.raftLog.append(0, ents...)
r.raftLog.committed = uint64(len(ents))
r.raftLog.committed = r.raftLog.lastIndex()
go n.run(r)
return &n

View File

@ -174,7 +174,6 @@ func TestNode(t *testing.T) {
CommittedEntries: []raftpb.Entry{{Term: 1, Index: 3, Data: []byte("foo")}},
},
}
n := StartNode(1, []Peer{{ID: 1}}, 10, 1)
n.ApplyConfChange(cc)
n.Campaign(ctx)