diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 535870619..c4c9af5d1 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -17,6 +17,8 @@ package etcdserver import ( + "crypto/sha1" + "encoding/binary" "fmt" "math/rand" "net/url" @@ -28,14 +30,21 @@ import ( ) // Cluster is a list of Members that belong to the same raft cluster -type Cluster map[uint64]*Member +type Cluster struct { + id uint64 + members map[uint64]*Member +} + +func NewCluster() *Cluster { + return &Cluster{members: make(map[uint64]*Member)} +} func (c Cluster) FindID(id uint64) *Member { - return c[id] + return c.members[id] } func (c Cluster) FindName(name string) *Member { - for _, m := range c { + for _, m := range c.members { if m.Name == name { return m } @@ -48,7 +57,7 @@ func (c Cluster) Add(m Member) error { if c.FindID(m.ID) != nil { return fmt.Errorf("Member exists with identical ID %v", m) } - c[m.ID] = &m + c.members[m.ID] = &m return nil } @@ -80,7 +89,7 @@ func (c Cluster) Pick(id uint64) string { // Set parses command line sets of names to IPs formatted like: // mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach0=http://1.1.1.1,mach1=http://2.2.2.2,mach1=http://3.3.3.3 func (c *Cluster) Set(s string) error { - *c = Cluster{} + *c = *NewCluster() v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1)) if err != nil { return err @@ -100,9 +109,19 @@ func (c *Cluster) Set(s string) error { return nil } +func (c *Cluster) GenID(salt []byte) { + mIDs := c.MemberIDs() + b := make([]byte, 8*len(mIDs)) + for i, id := range mIDs { + binary.BigEndian.PutUint64(b[8*i:], id) + } + hash := sha1.Sum(append(b, salt...)) + c.id = binary.BigEndian.Uint64(hash[:8]) +} + func (c Cluster) String() string { sl := []string{} - for _, m := range c { + for _, m := range c.members { for _, u := range m.PeerURLs { sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u)) } @@ -111,9 +130,13 @@ func (c Cluster) String() string { return strings.Join(sl, ",") } -func (c Cluster) IDs() []uint64 { +func (c Cluster) ID() uint64 { return c.id } + +func (c Cluster) Members() map[uint64]*Member { return c.members } + +func (c Cluster) MemberIDs() []uint64 { var ids []uint64 - for _, m := range c { + for _, m := range c.members { ids = append(ids, m.ID) } sort.Sort(types.Uint64Slice(ids)) @@ -125,7 +148,7 @@ func (c Cluster) IDs() []uint64 { // ascending lexicographical order. func (c Cluster) PeerURLs() []string { endpoints := make([]string, 0) - for _, p := range c { + for _, p := range c.members { for _, addr := range p.PeerURLs { endpoints = append(endpoints, addr) } @@ -139,7 +162,7 @@ func (c Cluster) PeerURLs() []string { // ascending lexicographical order. func (c Cluster) ClientURLs() []string { urls := make([]string, 0) - for _, p := range c { + for _, p := range c.members { for _, url := range p.ClientURLs { urls = append(urls, url) } diff --git a/etcdserver/cluster_store.go b/etcdserver/cluster_store.go index 6cea96c12..a17b99962 100644 --- a/etcdserver/cluster_store.go +++ b/etcdserver/cluster_store.go @@ -71,7 +71,7 @@ func (s *clusterStore) Add(m Member) { // TODO(philips): keep the latest copy without going to the store to avoid the // lock here. func (s *clusterStore) Get() Cluster { - c := &Cluster{} + c := NewCluster() e, err := s.Store.Get(membersKVPrefix, true, true) if err != nil { if v, ok := err.(*etcdErr.Error); ok && v.ErrorCode == etcdErr.EcodeKeyNotFound { diff --git a/etcdserver/cluster_store_test.go b/etcdserver/cluster_store_test.go index 4b33de779..3200f9ec2 100644 --- a/etcdserver/cluster_store_test.go +++ b/etcdserver/cluster_store_test.go @@ -96,12 +96,12 @@ func TestClusterStoreGet(t *testing.T) { for _, m := range tt.mems { cs.Add(m) } - c := Cluster{} + c := NewCluster() if err := c.AddSlice(tt.mems); err != nil { t.Fatal(err) } - if g := cs.Get(); !reflect.DeepEqual(g, c) { - t.Errorf("#%d: mems = %v, want %v", i, g, c) + if g := cs.Get(); !reflect.DeepEqual(&g, c) { + t.Errorf("#%d: mems = %v, want %v", i, &g, c) } } } diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 3c1e29fb7..6c3da6a43 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -24,28 +24,27 @@ import ( func TestClusterAddSlice(t *testing.T) { tests := []struct { mems []Member - want *Cluster }{ { []Member{}, - - &Cluster{}, + NewCluster(), }, { []Member{ newTestMember(1, []string{"foo", "bar"}, "", nil), newTestMember(2, []string{"baz"}, "", nil), }, - &Cluster{ - 1: newTestMemberp(1, []string{"foo", "bar"}, "", nil), - 2: newTestMemberp(2, []string{"baz"}, "", nil), + members: map[uint64]*Member{ + 1: newTestMemberp(1, []string{"foo", "bar"}, "", nil), + 2: newTestMemberp(2, []string{"baz"}, "", nil), + }, }, }, } for i, tt := range tests { - c := &Cluster{} + c := NewCluster() if err := c.AddSlice(tt.mems); err != nil { t.Errorf("#%d: err=%#v, want nil", i, err) continue @@ -58,7 +57,9 @@ func TestClusterAddSlice(t *testing.T) { func TestClusterAddSliceBad(t *testing.T) { c := Cluster{ - 1: newTestMemberp(1, nil, "", nil), + members: map[uint64]*Member{ + 1: newTestMemberp(1, nil, "", nil), + }, } if err := c.AddSlice([]Member{newTestMember(1, nil, "", nil)}); err == nil { t.Error("want err, but got nil") @@ -67,9 +68,11 @@ func TestClusterAddSliceBad(t *testing.T) { func TestClusterPick(t *testing.T) { cs := Cluster{ - 1: newTestMemberp(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil), - 2: newTestMemberp(2, []string{"xyz"}, "", nil), - 3: newTestMemberp(3, []string{}, "", nil), + members: map[uint64]*Member{ + 1: newTestMemberp(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil), + 2: newTestMemberp(2, []string{"xyz"}, "", nil), + 3: newTestMemberp(3, []string{}, "", nil), + }, } ids := map[string]bool{ "abc": true, @@ -131,7 +134,7 @@ func TestClusterFind(t *testing.T) { }, } for i, tt := range tests { - c := Cluster{} + c := NewCluster() c.AddSlice(tt.mems) m := c.FindName(tt.name) @@ -147,7 +150,7 @@ func TestClusterFind(t *testing.T) { } for i, tt := range tests { - c := Cluster{} + c := NewCluster() c.AddSlice(tt.mems) m := c.FindID(tt.id) @@ -178,7 +181,7 @@ func TestClusterSet(t *testing.T) { }, } for i, tt := range tests { - c := Cluster{} + c := NewCluster() if err := c.AddSlice(tt.mems); err != nil { t.Error(err) } @@ -192,6 +195,32 @@ func TestClusterSet(t *testing.T) { } } +func TestClusterGenID(t *testing.T) { + cs := NewCluster() + cs.AddSlice([]Member{ + newTestMember(1, nil, "", nil), + newTestMember(2, nil, "", nil), + }) + + cs.GenID(nil) + if cs.ID() == 0 { + t.Fatalf("cluster.ID = %v, want not 0", cs.ID()) + } + previd := cs.ID() + + cs.Add(newTestMember(3, nil, "", nil)) + cs.GenID(nil) + if cs.ID() == previd { + t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd) + } + previd = cs.ID() + + cs.GenID([]byte("http://discovery.etcd.io/12345678")) + if cs.ID() == previd { + t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd) + } +} + func TestClusterSetBad(t *testing.T) { tests := []string{ // invalid URL @@ -203,22 +232,22 @@ func TestClusterSetBad(t *testing.T) { // "06b2f82fd81b2c20=http://128.193.4.20:2379,02c60cb75083ceef=http://128.193.4.20:2379", } for i, tt := range tests { - g := Cluster{} + g := NewCluster() if err := g.Set(tt); err == nil { t.Errorf("#%d: set = %v, want err", i, tt) } } } -func TestClusterIDs(t *testing.T) { - cs := Cluster{} +func TestClusterMemberIDs(t *testing.T) { + cs := NewCluster() cs.AddSlice([]Member{ newTestMember(1, nil, "", nil), newTestMember(4, nil, "", nil), newTestMember(100, nil, "", nil), }) w := []uint64{1, 4, 100} - g := cs.IDs() + g := cs.MemberIDs() if !reflect.DeepEqual(w, g) { t.Errorf("IDs=%+v, want %+v", g, w) } @@ -230,7 +259,7 @@ func TestClusterAddBad(t *testing.T) { newTestMember(1, nil, "mem1", nil), newTestMember(1, nil, "mem2", nil), } - c := &Cluster{} + c := NewCluster() c.Add(newTestMember(1, nil, "mem1", nil)) for i, m := range mems { if err := c.Add(m); err == nil { @@ -286,7 +315,7 @@ func TestClusterPeerURLs(t *testing.T) { } for i, tt := range tests { - c := Cluster{} + c := NewCluster() if err := c.AddSlice(tt.mems); err != nil { t.Errorf("AddSlice error: %v", err) continue @@ -345,7 +374,7 @@ func TestClusterClientURLs(t *testing.T) { } for i, tt := range tests { - c := Cluster{} + c := NewCluster() if err := c.AddSlice(tt.mems); err != nil { t.Errorf("AddSlice error: %v", err) continue diff --git a/etcdserver/config.go b/etcdserver/config.go index 3ba84127a..049d3cf15 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -55,7 +55,7 @@ func (c *ServerConfig) VerifyBootstrapConfig() error { // No identical IPs in the cluster peer list urlMap := make(map[string]bool) - for _, m := range *c.Cluster { + for _, m := range c.Cluster.Members() { for _, url := range m.PeerURLs { if urlMap[url] { return fmt.Errorf("duplicate url %v in server config", url) diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index a3a09c71e..2c6ada670 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -1545,7 +1545,7 @@ type fakeCluster struct { func (c *fakeCluster) Add(m etcdserver.Member) { return } func (c *fakeCluster) Get() etcdserver.Cluster { - cl := &etcdserver.Cluster{} + cl := etcdserver.NewCluster() cl.AddSlice(c.members) return *cl } diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index 07baad0d8..02544b0ac 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -54,6 +54,7 @@ func (*Request) ProtoMessage() {} type Metadata struct { NodeID uint64 `protobuf:"varint,1,req" json:"NodeID"` + ClusterID uint64 `protobuf:"varint,2,req" json:"ClusterID"` XXX_unrecognized []byte `json:"-"` } @@ -422,6 +423,21 @@ func (m *Metadata) Unmarshal(data []byte) error { break } } + case 2: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.ClusterID |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: var sizeOfWire int for { @@ -479,6 +495,7 @@ func (m *Metadata) Size() (n int) { var l int _ = l n += 1 + sovEtcdserver(uint64(m.NodeID)) + n += 1 + sovEtcdserver(uint64(m.ClusterID)) if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -627,6 +644,9 @@ func (m *Metadata) MarshalTo(data []byte) (n int, err error) { data[i] = 0x8 i++ i = encodeVarintEtcdserver(data, i, uint64(m.NodeID)) + data[i] = 0x10 + i++ + i = encodeVarintEtcdserver(data, i, uint64(m.ClusterID)) if m.XXX_unrecognized != nil { i += copy(data[i:], m.XXX_unrecognized) } diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index 0cf1da9df..06e1082c8 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -27,5 +27,6 @@ message Request { } message Metadata { - required uint64 NodeID = 1 [(gogoproto.nullable) = false]; + required uint64 NodeID = 1 [(gogoproto.nullable) = false]; + required uint64 ClusterID = 2 [(gogoproto.nullable) = false]; } diff --git a/etcdserver/server.go b/etcdserver/server.go index f60ba81c8..69bc2cc8b 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -129,6 +129,7 @@ type EtcdServer struct { done chan struct{} stopped chan struct{} id uint64 + clusterID uint64 attributes Attributes ClusterStore ClusterStore @@ -167,6 +168,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer { st := store.New() var w *wal.WAL var n raft.Node + var id, cid uint64 if !wal.Exist(cfg.WALDir()) { if err := cfg.VerifyBootstrapConfig(); err != nil { log.Fatalf("etcdserver: %v", err) @@ -184,7 +186,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer { log.Fatalf("etcdserver: %v", err) } } - n, w = startNode(cfg) + id, cid, n, w = startNode(cfg) } else { if cfg.ShouldDiscover() { log.Printf("etcdserver: warn: ignoring discovery: etcd has already been initialized and has a valid log in %q", cfg.WALDir()) @@ -199,7 +201,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer { st.Recovery(snapshot.Data) index = snapshot.Index } - n, w = restartNode(cfg, index, snapshot) + id, cid, n, w = restartNode(cfg, index, snapshot) } cls := &clusterStore{Store: st} @@ -213,7 +215,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer { s := &EtcdServer{ store: st, node: n, - id: cfg.ID(), + id: id, + clusterID: cid, attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, storage: struct { *wal.WAL @@ -626,27 +629,31 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) { s.storage.Cut() } -func startNode(cfg *ServerConfig) (n raft.Node, w *wal.WAL) { +func startNode(cfg *ServerConfig) (id, cid uint64, n raft.Node, w *wal.WAL) { var err error - metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: cfg.ID()}) + // TODO: remove the discoveryURL when it becomes part of the source for + // generating nodeID. + cfg.Cluster.GenID([]byte(cfg.DiscoveryURL)) + metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: cfg.ID(), ClusterID: cfg.Cluster.ID()}) if w, err = wal.Create(cfg.WALDir(), metadata); err != nil { log.Fatal(err) } - ids := cfg.Cluster.IDs() + ids := cfg.Cluster.MemberIDs() peers := make([]raft.Peer, len(ids)) for i, id := range ids { - ctx, err := json.Marshal((*cfg.Cluster)[id]) + ctx, err := json.Marshal((*cfg.Cluster).FindID(id)) if err != nil { log.Fatal(err) } peers[i] = raft.Peer{ID: id, Context: ctx} } - log.Printf("etcdserver: start node %d", cfg.ID()) + id, cid = cfg.ID(), cfg.Cluster.ID() + log.Printf("etcdserver: start node %d in cluster %d", id, cid) n = raft.StartNode(cfg.ID(), peers, 10, 1) return } -func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (n raft.Node, w *wal.WAL) { +func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id, cid uint64, n raft.Node, w *wal.WAL) { var err error // restart a node from previous wal if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil { @@ -659,8 +666,9 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (n var metadata pb.Metadata pbutil.MustUnmarshal(&metadata, wmetadata) - log.Printf("etcdserver: restart node %d at commit index %d", metadata.NodeID, st.Commit) - n = raft.RestartNode(metadata.NodeID, 10, 1, snapshot, st, ents) + id, cid = metadata.NodeID, metadata.ClusterID + log.Printf("etcdserver: restart node %d in cluster %d at commit index %d", id, cid, st.Commit) + n = raft.RestartNode(id, 10, 1, snapshot, st, ents) return } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index fc272f885..45858a871 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -448,7 +448,6 @@ func testServer(t *testing.T, ns uint64) { ids[i] = i + 1 } members := mustMakePeerSlice(t, ids...) - for i := uint64(0); i < ns; i++ { id := i + 1 n := raft.StartNode(id, members, 10, 1) @@ -783,11 +782,12 @@ func TestTriggerSnap(t *testing.T) { gaction := p.Action() // each operation is recorded as a Save // BootstrapConfig/Nop + (SnapCount - 1) * Puts + Cut + SaveSnap = Save + (SnapCount - 1) * Save + Cut + SaveSnap - if len(gaction) != 2+int(s.snapCount) { - t.Fatalf("len(action) = %d, want %d", len(gaction), 2+int(s.snapCount)) + wcnt := 2 + int(s.snapCount) + if len(gaction) != wcnt { + t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt) } - if !reflect.DeepEqual(gaction[11], action{name: "SaveSnap"}) { - t.Errorf("action = %s, want SaveSnap", gaction[11]) + if !reflect.DeepEqual(gaction[wcnt-1], action{name: "SaveSnap"}) { + t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1]) } } @@ -1307,7 +1307,7 @@ func (cs *clusterStoreRecorder) Add(m Member) { } func (cs *clusterStoreRecorder) Get() Cluster { cs.record(action{name: "Get"}) - return nil + return Cluster{} } func (cs *clusterStoreRecorder) Remove(id uint64) { cs.record(action{name: "Remove", params: []interface{}{id}}) diff --git a/raft/node.go b/raft/node.go index 06da88fc5..5a5fcfb19 100644 --- a/raft/node.go +++ b/raft/node.go @@ -144,17 +144,16 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int) Node { n := newNode() r := newRaft(id, nil, election, heartbeat) - ents := make([]pb.Entry, len(peers)) - for i, peer := range peers { + for _, peer := range peers { cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} - data, err := cc.Marshal() + d, err := cc.Marshal() if err != nil { panic("unexpected marshal error") } - ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} + e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d} + r.raftLog.append(r.raftLog.lastIndex(), e) } - r.raftLog.append(0, ents...) - r.raftLog.committed = uint64(len(ents)) + r.raftLog.committed = r.raftLog.lastIndex() go n.run(r) return &n diff --git a/raft/node_test.go b/raft/node_test.go index fd63abce7..3ae0fc901 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -190,7 +190,6 @@ func TestNode(t *testing.T) { CommittedEntries: []raftpb.Entry{{Term: 1, Index: 3, Data: []byte("foo")}}, }, } - n := StartNode(1, []Peer{{ID: 1}}, 10, 1) n.ApplyConfChange(cc) n.Campaign(ctx)