From c1da78601abd32de47c511415ee551f96d51c044 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sat, 16 Aug 2014 10:33:24 -0700 Subject: [PATCH] server: use random generated node id --- etcd/etcd.go | 36 +++++++++++++++-------------------- etcd/etcd_functional_test.go | 17 ++++++++++------- etcd/etcd_start_test.go | 23 ++++++++++++---------- etcd/etcd_test.go | 36 +++++++++++++++++++++-------------- etcd/participant.go | 6 +++--- etcd/peer_hub.go | 4 ++-- etcd/v2_http_endpoint_test.go | 4 ++-- 7 files changed, 67 insertions(+), 59 deletions(-) diff --git a/etcd/etcd.go b/etcd/etcd.go index ced79ebe4..3664d231f 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -37,7 +37,7 @@ const ( type Server struct { cfg *conf.Config - id int64 + name string pubAddr string raftPubAddr string tickDuration time.Duration @@ -80,7 +80,7 @@ func New(c *conf.Config) (*Server, error) { s := &Server{ cfg: c, - id: genId(), + name: c.Name, pubAddr: c.Addr, raftPubAddr: c.Peer.Addr, tickDuration: defaultTickDuration, @@ -92,13 +92,14 @@ func New(c *conf.Config) (*Server, error) { exited: make(chan error, 1), stopNotifyc: make(chan struct{}), } - s.peerHub = newPeerHub(s.id, client) + followersStats := NewRaftFollowersStats(s.name) + s.peerHub = newPeerHub(client, followersStats) m := http.NewServeMux() m.HandleFunc("/", s.requestHandler) m.HandleFunc("/version", versionHandler) s.Handler = m - log.Printf("id=%x server.new raftPubAddr=%s\n", s.id, s.raftPubAddr) + log.Printf("name=%s server.new raftPubAddr=%s\n", s.name, s.raftPubAddr) if err = os.MkdirAll(s.cfg.DataDir, 0700); err != nil { if !os.IsExist(err) { return nil, err @@ -109,7 +110,7 @@ func New(c *conf.Config) (*Server, error) { func (s *Server) SetTick(tick time.Duration) { s.tickDuration = tick - log.Printf("id=%x server.setTick tick=%q\n", s.id, s.tickDuration) + log.Printf("name=%s server.setTick tick=%q\n", s.name, s.tickDuration) } // Stop stops the server elegently. @@ -118,7 +119,7 @@ func (s *Server) Stop() error { close(s.stopNotifyc) err := <-s.exited s.client.CloseConnections() - log.Printf("id=%x server.stop\n", s.id) + log.Printf("name=%s server.stop\n", s.name) return err } @@ -159,17 +160,17 @@ func (s *Server) Run() error { exit = err return fmt.Errorf("bad discovery URL error: %v", err) } - d = newDiscoverer(u, fmt.Sprint(s.id), s.raftPubAddr) + d = newDiscoverer(u, s.name, s.raftPubAddr) if seeds, err = d.discover(); err != nil { exit = err return err } - log.Printf("id=%x server.run source=-discovery seeds=\"%v\"\n", s.id, seeds) + log.Printf("name=%s server.run source=-discovery seeds=%q", s.name, seeds) } else { for _, p := range s.cfg.Peers { u, err := url.Parse(p) if err != nil { - log.Printf("id=%x server.run err=%q", err) + log.Printf("name=%s server.run err=%q", s.name, err) continue } if u.Scheme == "" { @@ -177,7 +178,7 @@ func (s *Server) Run() error { } seeds = append(seeds, u.String()) } - log.Printf("id=%x server.run source=-peers seeds=\"%v\"\n", s.id, seeds) + log.Printf("name=%s server.run source=-peers seeds=%q", s.name, seeds) } s.peerHub.setSeeds(seeds) @@ -185,15 +186,15 @@ func (s *Server) Run() error { for { switch next { case participantMode: - p, err := newParticipant(s.id, s.cfg, s.client, s.peerHub, s.tickDuration) + p, err := newParticipant(s.cfg, s.client, s.peerHub, s.tickDuration) if err != nil { - log.Printf("id=%x server.run newParicipanteErr=\"%v\"\n", s.id, err) + log.Printf("name=%s server.run newParicipanteErr=\"%v\"\n", s.name, err) exit = err return err } s.p = p s.mode.Set(participantMode) - log.Printf("id=%x server.run mode=participantMode\n", s.id) + log.Printf("name=%s server.run mode=participantMode\n", s.name) dStopc := make(chan struct{}) if d != nil { go d.heartbeat(dStopc) @@ -206,7 +207,7 @@ func (s *Server) Run() error { case standbyMode: s.s = newStandby(s.client, s.peerHub) s.mode.Set(standbyMode) - log.Printf("id=%x server.run mode=standbyMode\n", s.id) + log.Printf("name=%s server.run mode=standbyMode\n", s.name) s.s.run(s.stopNotifyc) next = participantMode default: @@ -215,12 +216,5 @@ func (s *Server) Run() error { if s.mode.Get() == stopMode { return nil } - s.id = genId() } } - -// setId sets the id for the participant. This should only be used for testing. -func (s *Server) setId(id int64) { - log.Printf("id=%x server.setId oldId=%x\n", id, s.id) - s.id = id -} diff --git a/etcd/etcd_functional_test.go b/etcd/etcd_functional_test.go index cf7826f8b..58a5096bf 100644 --- a/etcd/etcd_functional_test.go +++ b/etcd/etcd_functional_test.go @@ -17,6 +17,7 @@ limitations under the License. package etcd import ( + "fmt" "math/rand" "net/url" "reflect" @@ -36,14 +37,14 @@ func TestKillLeader(t *testing.T) { cl.Start() for j := 0; j < tt; j++ { lead, _ := cl.Leader() - cl.Node(int(lead)).Stop() + cl.Node(lead).Stop() // wait for leader election timeout time.Sleep(cl.Node(0).e.tickDuration * defaultElection * 2) if g, _ := cl.Leader(); g == lead { t.Errorf("#%d.%d: lead = %d, want not %d", i, j, g, lead) } - cl.Node(int(lead)).Start() - cl.Node(int(lead)).WaitMode(participantMode) + cl.Node(lead).Start() + cl.Node(lead).WaitMode(participantMode) } cl.Destroy() } @@ -92,8 +93,9 @@ func TestJoinThroughFollower(t *testing.T) { for i := 1; i < tt; i++ { c := newTestConfig() + c.Name = fmt.Sprint(i) c.Peers = []string{seed} - ts := &testServer{Config: c, Id: int64(i)} + ts := &testServer{Config: c} ts.Start() ts.WaitMode(participantMode) cl.nodes = append(cl.nodes, ts) @@ -119,8 +121,9 @@ func TestJoinWithoutHTTPScheme(t *testing.T) { for i := 1; i < 3; i++ { c := newTestConfig() + c.Name = "server-" + fmt.Sprint(i) c.Peers = []string{seed} - ts := &testServer{Config: c, Id: int64(i)} + ts := &testServer{Config: c} ts.Start() ts.WaitMode(participantMode) cl.nodes = append(cl.nodes, ts) @@ -140,7 +143,7 @@ func TestClusterConfigReload(t *testing.T) { cc := conf.NewClusterConfig() cc.ActiveSize = 15 cc.RemoveDelay = 60 - if err := cl.Participant(int(lead)).setClusterConfig(cc); err != nil { + if err := cl.Participant(lead).setClusterConfig(cc); err != nil { t.Fatalf("setClusterConfig err = %v", err) } @@ -150,7 +153,7 @@ func TestClusterConfigReload(t *testing.T) { lead, _ = cl.Leader() // wait for msgAppResp to commit all entries time.Sleep(2 * defaultHeartbeat * cl.Participant(0).tickDuration) - if g := cl.Participant(int(lead)).clusterConfig(); !reflect.DeepEqual(g, cc) { + if g := cl.Participant(lead).clusterConfig(); !reflect.DeepEqual(g, cc) { t.Errorf("clusterConfig = %+v, want %+v", g, cc) } } diff --git a/etcd/etcd_start_test.go b/etcd/etcd_start_test.go index 1eac74061..6b5140fad 100644 --- a/etcd/etcd_start_test.go +++ b/etcd/etcd_start_test.go @@ -28,7 +28,7 @@ import ( ) const ( - bootstrapId = 0xBEEF + bootstrapName = "BEEF" ) type garbageHandler struct { @@ -39,7 +39,7 @@ type garbageHandler struct { func (g *garbageHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "Hello, client") - wp := fmt.Sprint("/v2/keys/_etcd/registry/1/", bootstrapId) + wp := fmt.Sprint("/v2/keys/_etcd/registry/1/", bootstrapName) if gp := r.URL.String(); gp != wp { g.t.Fatalf("url = %s, want %s", gp, wp) } @@ -56,8 +56,9 @@ func TestBadDiscoveryService(t *testing.T) { defer httpts.Close() c := newTestConfig() + c.Name = bootstrapName c.Discovery = httpts.URL + "/v2/keys/_etcd/registry/1" - ts := testServer{Config: c, Id: bootstrapId} + ts := testServer{Config: c} ts.Start() err := ts.Destroy() @@ -80,9 +81,10 @@ func TestBadDiscoveryServiceWithAdvisedPeers(t *testing.T) { defer httpts.Close() c := newTestConfig() + c.Name = bootstrapName c.Discovery = httpts.URL + "/v2/keys/_etcd/registry/1" c.Peers = []string{"a peer"} - ts := testServer{Config: c, Id: bootstrapId} + ts := testServer{Config: c} ts.Start() err := ts.Destroy() @@ -94,13 +96,12 @@ func TestBadDiscoveryServiceWithAdvisedPeers(t *testing.T) { func TestBootstrapByEmptyPeers(t *testing.T) { defer afterTest(t) - id := genId() - ts := testServer{Id: id} + ts := testServer{} ts.Start() defer ts.Destroy() ts.WaitMode(participantMode) - if ts.Participant().node.Leader() != id { - t.Errorf("leader = %x, want %x", ts.Participant().node.Leader(), id) + if ts.Participant().node.Leader() != ts.Participant().id { + t.Errorf("leader = %x, want %x", ts.Participant().node.Leader(), ts.Participant().id) } } @@ -111,8 +112,9 @@ func TestBootstrapByDiscoveryService(t *testing.T) { defer discoverService.Destroy() c := newTestConfig() + c.Name = bootstrapName c.Discovery = discoverService.URL(0) + "/v2/keys/_etcd/registry/1" - ts := testServer{Id: bootstrapId, Config: c} + ts := testServer{Config: c} ts.Start() ts.WaitMode(participantMode) err := ts.Destroy() @@ -147,8 +149,9 @@ func TestRunByDiscoveryService(t *testing.T) { resp.Body.Close() c := newTestConfig() + c.Name = bootstrapName c.Discovery = ds.URL(0) + "/v2/keys/_etcd/registry/1" - ts := testServer{Config: c, Id: bootstrapId} + ts := testServer{Config: c} ts.Start() defer ts.Destroy() diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index 73aecfbcb..6476bc2bf 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -94,7 +94,7 @@ func TestRemove(t *testing.T) { lead, _ := cl.Leader() config := conf.NewClusterConfig() config.ActiveSize = 0 - if err := cl.Participant(int(lead)).setClusterConfig(config); err != nil { + if err := cl.Participant(lead).setClusterConfig(config); err != nil { t.Fatalf("#%d: setClusterConfig err = %v", k, err) } @@ -102,9 +102,9 @@ func TestRemove(t *testing.T) { // not 100 percent safe in our raft. // TODO(yichengq): improve it later. for i := 0; i < tt-2; i++ { - id := int64(i) + id := cl.Id(i) for { - n := cl.Node(int(id)) + n := cl.Node(i) if n.e.mode.Get() == standbyMode { break } @@ -189,7 +189,7 @@ func TestVersionCheck(t *testing.T) { func TestSingleNodeRecovery(t *testing.T) { defer afterTest(t) c := newTestConfig() - ts := testServer{Id: genId(), Config: c} + ts := testServer{Config: c} ts.Start() defer ts.Destroy() @@ -202,7 +202,7 @@ func TestSingleNodeRecovery(t *testing.T) { } ts.Stop() - ts = testServer{Id: ts.Id, Config: c} + ts = testServer{Config: c} ts.Start() ts.WaitMode(participantMode) w, err := ts.Participant().Store.Watch(key, false, false, ev.Index()) @@ -248,8 +248,9 @@ func TestRestoreSnapshotFromLeader(t *testing.T) { // create one to join the cluster c := newTestConfig() + c.Name = "1" c.Peers = []string{cl.URL(0)} - ts := testServer{Config: c, Id: 1} + ts := testServer{Config: c} ts.Start() defer ts.Destroy() ts.WaitMode(participantMode) @@ -300,16 +301,18 @@ func (c *testCluster) Start() { nodes := make([]*testServer, c.Size) c.nodes = nodes - nodes[0] = &testServer{Id: 0, TLS: c.TLS} + cfg := newTestConfig() + cfg.Name = "testServer-0" + nodes[0] = &testServer{Config: cfg, TLS: c.TLS} nodes[0].Start() nodes[0].WaitMode(participantMode) seed := nodes[0].URL for i := 1; i < c.Size; i++ { cfg := newTestConfig() + cfg.Name = "testServer-" + fmt.Sprint(i) cfg.Peers = []string{seed} - id := int64(i) - s := &testServer{Config: cfg, Id: id, TLS: c.TLS} + s := &testServer{Config: cfg, TLS: c.TLS} s.Start() nodes[i] = s @@ -332,7 +335,7 @@ func (c *testCluster) wait() { for i := 0; i < size; i++ { for k := 0; k < size; k++ { s := c.Node(i) - wp := v2machineKVPrefix + fmt.Sprintf("/%d", c.Node(k).Id) + wp := v2machineKVPrefix + fmt.Sprintf("/%d", c.Id(k)) w, err := s.Participant().Watch(wp, false, false, 1) if err != nil { panic(err) @@ -365,6 +368,10 @@ func (c *testCluster) URL(i int) string { return c.nodes[i].h.URL } +func (c *testCluster) Id(i int) int64 { + return c.Participant(i).id +} + func (c *testCluster) Restart() { for _, s := range c.nodes { s.Start() @@ -383,20 +390,23 @@ func (c *testCluster) Destroy() { } } -func (c *testCluster) Leader() (lead, term int64) { +// Leader returns the index of leader in the cluster and its leader term. +func (c *testCluster) Leader() (leadIdx int, term int64) { + ids := make(map[int64]int) for { ls := make([]leadterm, 0, c.Size) for i := range c.nodes { switch c.Node(i).e.mode.Get() { case participantMode: ls = append(ls, c.Node(i).Lead()) + ids[c.Id(i)] = i case standbyMode: //TODO(xiangli) add standby support case stopMode: } } if isSameLead(ls) { - return ls[0].lead, ls[0].term + return ids[ls[0].lead], ls[0].term } time.Sleep(c.Node(0).e.tickDuration * defaultElection) } @@ -424,7 +434,6 @@ func isSameLead(ls []leadterm) bool { type testServer struct { Config *conf.Config - Id int64 TLS bool // base URL of form http://ipaddr:port with no trailing slash @@ -452,7 +461,6 @@ func (s *testServer) Start() { panic(err) } s.e = e - e.setId(s.Id) tick := time.Duration(c.Peer.HeartbeatInterval) * time.Millisecond e.SetTick(tick) diff --git a/etcd/participant.go b/etcd/participant.go index 7c0f59023..7e5222b3a 100644 --- a/etcd/participant.go +++ b/etcd/participant.go @@ -87,7 +87,7 @@ type participant struct { *http.ServeMux } -func newParticipant(id int64, c *conf.Config, client *v2client, peerHub *peerHub, tickDuration time.Duration) (*participant, error) { +func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDuration time.Duration) (*participant, error) { p := &participant{ clusterId: -1, cfg: c, @@ -103,7 +103,7 @@ func newParticipant(id int64, c *conf.Config, client *v2client, peerHub *peerHub result: make(map[wait]chan interface{}), }, Store: store.New(), - serverStats: NewRaftServerStats(fmt.Sprint(id)), + serverStats: NewRaftServerStats(c.Name), stopNotifyc: make(chan struct{}), @@ -119,7 +119,7 @@ func newParticipant(id int64, c *conf.Config, client *v2client, peerHub *peerHub return nil, err } - p.id = id + p.id = genId() p.pubAddr = c.Addr p.raftPubAddr = c.Peer.Addr if w, err = wal.New(walPath); err != nil { diff --git a/etcd/peer_hub.go b/etcd/peer_hub.go index d10cf9a7c..e3ebdbd19 100644 --- a/etcd/peer_hub.go +++ b/etcd/peer_hub.go @@ -48,12 +48,12 @@ type peerHub struct { serverStats *raftServerStats } -func newPeerHub(id int64, c *http.Client) *peerHub { +func newPeerHub(c *http.Client, followersStats *raftFollowersStats) *peerHub { h := &peerHub{ peers: make(map[int64]*peer), seeds: make(map[string]bool), c: c, - followersStats: NewRaftFollowersStats(fmt.Sprint(id)), + followersStats: followersStats, } return h } diff --git a/etcd/v2_http_endpoint_test.go b/etcd/v2_http_endpoint_test.go index 42f5a0f9f..4e5d89971 100644 --- a/etcd/v2_http_endpoint_test.go +++ b/etcd/v2_http_endpoint_test.go @@ -205,7 +205,7 @@ func TestGetAdminMachineEndPoint(t *testing.T) { for i := 0; i < cl.Size; i++ { for j := 0; j < cl.Size; j++ { - name := fmt.Sprint(cl.Node(i).Id) + name := fmt.Sprint(cl.Id(i)) r, err := http.Get(cl.URL(j) + v2adminMachinesPrefix + name) if err != nil { t.Errorf("%v", err) @@ -250,7 +250,7 @@ func TestGetAdminMachinesEndPoint(t *testing.T) { w := make([]*machineMessage, cl.Size) for i := 0; i < cl.Size; i++ { w[i] = &machineMessage{ - Name: fmt.Sprint(cl.Node(i).Id), + Name: fmt.Sprint(cl.Id(i)), State: stateFollower, ClientURL: cl.URL(i), PeerURL: cl.URL(i),