server: use random generated node id

This commit is contained in:
Yicheng Qin 2014-08-16 10:33:24 -07:00
parent a5df254e53
commit c1da78601a
7 changed files with 67 additions and 59 deletions

View File

@ -37,7 +37,7 @@ const (
type Server struct {
cfg *conf.Config
id int64
name string
pubAddr string
raftPubAddr string
tickDuration time.Duration
@ -80,7 +80,7 @@ func New(c *conf.Config) (*Server, error) {
s := &Server{
cfg: c,
id: genId(),
name: c.Name,
pubAddr: c.Addr,
raftPubAddr: c.Peer.Addr,
tickDuration: defaultTickDuration,
@ -92,13 +92,14 @@ func New(c *conf.Config) (*Server, error) {
exited: make(chan error, 1),
stopNotifyc: make(chan struct{}),
}
s.peerHub = newPeerHub(s.id, client)
followersStats := NewRaftFollowersStats(s.name)
s.peerHub = newPeerHub(client, followersStats)
m := http.NewServeMux()
m.HandleFunc("/", s.requestHandler)
m.HandleFunc("/version", versionHandler)
s.Handler = m
log.Printf("id=%x server.new raftPubAddr=%s\n", s.id, s.raftPubAddr)
log.Printf("name=%s server.new raftPubAddr=%s\n", s.name, s.raftPubAddr)
if err = os.MkdirAll(s.cfg.DataDir, 0700); err != nil {
if !os.IsExist(err) {
return nil, err
@ -109,7 +110,7 @@ func New(c *conf.Config) (*Server, error) {
func (s *Server) SetTick(tick time.Duration) {
s.tickDuration = tick
log.Printf("id=%x server.setTick tick=%q\n", s.id, s.tickDuration)
log.Printf("name=%s server.setTick tick=%q\n", s.name, s.tickDuration)
}
// Stop stops the server elegently.
@ -118,7 +119,7 @@ func (s *Server) Stop() error {
close(s.stopNotifyc)
err := <-s.exited
s.client.CloseConnections()
log.Printf("id=%x server.stop\n", s.id)
log.Printf("name=%s server.stop\n", s.name)
return err
}
@ -159,17 +160,17 @@ func (s *Server) Run() error {
exit = err
return fmt.Errorf("bad discovery URL error: %v", err)
}
d = newDiscoverer(u, fmt.Sprint(s.id), s.raftPubAddr)
d = newDiscoverer(u, s.name, s.raftPubAddr)
if seeds, err = d.discover(); err != nil {
exit = err
return err
}
log.Printf("id=%x server.run source=-discovery seeds=\"%v\"\n", s.id, seeds)
log.Printf("name=%s server.run source=-discovery seeds=%q", s.name, seeds)
} else {
for _, p := range s.cfg.Peers {
u, err := url.Parse(p)
if err != nil {
log.Printf("id=%x server.run err=%q", err)
log.Printf("name=%s server.run err=%q", s.name, err)
continue
}
if u.Scheme == "" {
@ -177,7 +178,7 @@ func (s *Server) Run() error {
}
seeds = append(seeds, u.String())
}
log.Printf("id=%x server.run source=-peers seeds=\"%v\"\n", s.id, seeds)
log.Printf("name=%s server.run source=-peers seeds=%q", s.name, seeds)
}
s.peerHub.setSeeds(seeds)
@ -185,15 +186,15 @@ func (s *Server) Run() error {
for {
switch next {
case participantMode:
p, err := newParticipant(s.id, s.cfg, s.client, s.peerHub, s.tickDuration)
p, err := newParticipant(s.cfg, s.client, s.peerHub, s.tickDuration)
if err != nil {
log.Printf("id=%x server.run newParicipanteErr=\"%v\"\n", s.id, err)
log.Printf("name=%s server.run newParicipanteErr=\"%v\"\n", s.name, err)
exit = err
return err
}
s.p = p
s.mode.Set(participantMode)
log.Printf("id=%x server.run mode=participantMode\n", s.id)
log.Printf("name=%s server.run mode=participantMode\n", s.name)
dStopc := make(chan struct{})
if d != nil {
go d.heartbeat(dStopc)
@ -206,7 +207,7 @@ func (s *Server) Run() error {
case standbyMode:
s.s = newStandby(s.client, s.peerHub)
s.mode.Set(standbyMode)
log.Printf("id=%x server.run mode=standbyMode\n", s.id)
log.Printf("name=%s server.run mode=standbyMode\n", s.name)
s.s.run(s.stopNotifyc)
next = participantMode
default:
@ -215,12 +216,5 @@ func (s *Server) Run() error {
if s.mode.Get() == stopMode {
return nil
}
s.id = genId()
}
}
// setId sets the id for the participant. This should only be used for testing.
func (s *Server) setId(id int64) {
log.Printf("id=%x server.setId oldId=%x\n", id, s.id)
s.id = id
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package etcd
import (
"fmt"
"math/rand"
"net/url"
"reflect"
@ -36,14 +37,14 @@ func TestKillLeader(t *testing.T) {
cl.Start()
for j := 0; j < tt; j++ {
lead, _ := cl.Leader()
cl.Node(int(lead)).Stop()
cl.Node(lead).Stop()
// wait for leader election timeout
time.Sleep(cl.Node(0).e.tickDuration * defaultElection * 2)
if g, _ := cl.Leader(); g == lead {
t.Errorf("#%d.%d: lead = %d, want not %d", i, j, g, lead)
}
cl.Node(int(lead)).Start()
cl.Node(int(lead)).WaitMode(participantMode)
cl.Node(lead).Start()
cl.Node(lead).WaitMode(participantMode)
}
cl.Destroy()
}
@ -92,8 +93,9 @@ func TestJoinThroughFollower(t *testing.T) {
for i := 1; i < tt; i++ {
c := newTestConfig()
c.Name = fmt.Sprint(i)
c.Peers = []string{seed}
ts := &testServer{Config: c, Id: int64(i)}
ts := &testServer{Config: c}
ts.Start()
ts.WaitMode(participantMode)
cl.nodes = append(cl.nodes, ts)
@ -119,8 +121,9 @@ func TestJoinWithoutHTTPScheme(t *testing.T) {
for i := 1; i < 3; i++ {
c := newTestConfig()
c.Name = "server-" + fmt.Sprint(i)
c.Peers = []string{seed}
ts := &testServer{Config: c, Id: int64(i)}
ts := &testServer{Config: c}
ts.Start()
ts.WaitMode(participantMode)
cl.nodes = append(cl.nodes, ts)
@ -140,7 +143,7 @@ func TestClusterConfigReload(t *testing.T) {
cc := conf.NewClusterConfig()
cc.ActiveSize = 15
cc.RemoveDelay = 60
if err := cl.Participant(int(lead)).setClusterConfig(cc); err != nil {
if err := cl.Participant(lead).setClusterConfig(cc); err != nil {
t.Fatalf("setClusterConfig err = %v", err)
}
@ -150,7 +153,7 @@ func TestClusterConfigReload(t *testing.T) {
lead, _ = cl.Leader()
// wait for msgAppResp to commit all entries
time.Sleep(2 * defaultHeartbeat * cl.Participant(0).tickDuration)
if g := cl.Participant(int(lead)).clusterConfig(); !reflect.DeepEqual(g, cc) {
if g := cl.Participant(lead).clusterConfig(); !reflect.DeepEqual(g, cc) {
t.Errorf("clusterConfig = %+v, want %+v", g, cc)
}
}

View File

@ -28,7 +28,7 @@ import (
)
const (
bootstrapId = 0xBEEF
bootstrapName = "BEEF"
)
type garbageHandler struct {
@ -39,7 +39,7 @@ type garbageHandler struct {
func (g *garbageHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello, client")
wp := fmt.Sprint("/v2/keys/_etcd/registry/1/", bootstrapId)
wp := fmt.Sprint("/v2/keys/_etcd/registry/1/", bootstrapName)
if gp := r.URL.String(); gp != wp {
g.t.Fatalf("url = %s, want %s", gp, wp)
}
@ -56,8 +56,9 @@ func TestBadDiscoveryService(t *testing.T) {
defer httpts.Close()
c := newTestConfig()
c.Name = bootstrapName
c.Discovery = httpts.URL + "/v2/keys/_etcd/registry/1"
ts := testServer{Config: c, Id: bootstrapId}
ts := testServer{Config: c}
ts.Start()
err := ts.Destroy()
@ -80,9 +81,10 @@ func TestBadDiscoveryServiceWithAdvisedPeers(t *testing.T) {
defer httpts.Close()
c := newTestConfig()
c.Name = bootstrapName
c.Discovery = httpts.URL + "/v2/keys/_etcd/registry/1"
c.Peers = []string{"a peer"}
ts := testServer{Config: c, Id: bootstrapId}
ts := testServer{Config: c}
ts.Start()
err := ts.Destroy()
@ -94,13 +96,12 @@ func TestBadDiscoveryServiceWithAdvisedPeers(t *testing.T) {
func TestBootstrapByEmptyPeers(t *testing.T) {
defer afterTest(t)
id := genId()
ts := testServer{Id: id}
ts := testServer{}
ts.Start()
defer ts.Destroy()
ts.WaitMode(participantMode)
if ts.Participant().node.Leader() != id {
t.Errorf("leader = %x, want %x", ts.Participant().node.Leader(), id)
if ts.Participant().node.Leader() != ts.Participant().id {
t.Errorf("leader = %x, want %x", ts.Participant().node.Leader(), ts.Participant().id)
}
}
@ -111,8 +112,9 @@ func TestBootstrapByDiscoveryService(t *testing.T) {
defer discoverService.Destroy()
c := newTestConfig()
c.Name = bootstrapName
c.Discovery = discoverService.URL(0) + "/v2/keys/_etcd/registry/1"
ts := testServer{Id: bootstrapId, Config: c}
ts := testServer{Config: c}
ts.Start()
ts.WaitMode(participantMode)
err := ts.Destroy()
@ -147,8 +149,9 @@ func TestRunByDiscoveryService(t *testing.T) {
resp.Body.Close()
c := newTestConfig()
c.Name = bootstrapName
c.Discovery = ds.URL(0) + "/v2/keys/_etcd/registry/1"
ts := testServer{Config: c, Id: bootstrapId}
ts := testServer{Config: c}
ts.Start()
defer ts.Destroy()

View File

@ -94,7 +94,7 @@ func TestRemove(t *testing.T) {
lead, _ := cl.Leader()
config := conf.NewClusterConfig()
config.ActiveSize = 0
if err := cl.Participant(int(lead)).setClusterConfig(config); err != nil {
if err := cl.Participant(lead).setClusterConfig(config); err != nil {
t.Fatalf("#%d: setClusterConfig err = %v", k, err)
}
@ -102,9 +102,9 @@ func TestRemove(t *testing.T) {
// not 100 percent safe in our raft.
// TODO(yichengq): improve it later.
for i := 0; i < tt-2; i++ {
id := int64(i)
id := cl.Id(i)
for {
n := cl.Node(int(id))
n := cl.Node(i)
if n.e.mode.Get() == standbyMode {
break
}
@ -189,7 +189,7 @@ func TestVersionCheck(t *testing.T) {
func TestSingleNodeRecovery(t *testing.T) {
defer afterTest(t)
c := newTestConfig()
ts := testServer{Id: genId(), Config: c}
ts := testServer{Config: c}
ts.Start()
defer ts.Destroy()
@ -202,7 +202,7 @@ func TestSingleNodeRecovery(t *testing.T) {
}
ts.Stop()
ts = testServer{Id: ts.Id, Config: c}
ts = testServer{Config: c}
ts.Start()
ts.WaitMode(participantMode)
w, err := ts.Participant().Store.Watch(key, false, false, ev.Index())
@ -248,8 +248,9 @@ func TestRestoreSnapshotFromLeader(t *testing.T) {
// create one to join the cluster
c := newTestConfig()
c.Name = "1"
c.Peers = []string{cl.URL(0)}
ts := testServer{Config: c, Id: 1}
ts := testServer{Config: c}
ts.Start()
defer ts.Destroy()
ts.WaitMode(participantMode)
@ -300,16 +301,18 @@ func (c *testCluster) Start() {
nodes := make([]*testServer, c.Size)
c.nodes = nodes
nodes[0] = &testServer{Id: 0, TLS: c.TLS}
cfg := newTestConfig()
cfg.Name = "testServer-0"
nodes[0] = &testServer{Config: cfg, TLS: c.TLS}
nodes[0].Start()
nodes[0].WaitMode(participantMode)
seed := nodes[0].URL
for i := 1; i < c.Size; i++ {
cfg := newTestConfig()
cfg.Name = "testServer-" + fmt.Sprint(i)
cfg.Peers = []string{seed}
id := int64(i)
s := &testServer{Config: cfg, Id: id, TLS: c.TLS}
s := &testServer{Config: cfg, TLS: c.TLS}
s.Start()
nodes[i] = s
@ -332,7 +335,7 @@ func (c *testCluster) wait() {
for i := 0; i < size; i++ {
for k := 0; k < size; k++ {
s := c.Node(i)
wp := v2machineKVPrefix + fmt.Sprintf("/%d", c.Node(k).Id)
wp := v2machineKVPrefix + fmt.Sprintf("/%d", c.Id(k))
w, err := s.Participant().Watch(wp, false, false, 1)
if err != nil {
panic(err)
@ -365,6 +368,10 @@ func (c *testCluster) URL(i int) string {
return c.nodes[i].h.URL
}
func (c *testCluster) Id(i int) int64 {
return c.Participant(i).id
}
func (c *testCluster) Restart() {
for _, s := range c.nodes {
s.Start()
@ -383,20 +390,23 @@ func (c *testCluster) Destroy() {
}
}
func (c *testCluster) Leader() (lead, term int64) {
// Leader returns the index of leader in the cluster and its leader term.
func (c *testCluster) Leader() (leadIdx int, term int64) {
ids := make(map[int64]int)
for {
ls := make([]leadterm, 0, c.Size)
for i := range c.nodes {
switch c.Node(i).e.mode.Get() {
case participantMode:
ls = append(ls, c.Node(i).Lead())
ids[c.Id(i)] = i
case standbyMode:
//TODO(xiangli) add standby support
case stopMode:
}
}
if isSameLead(ls) {
return ls[0].lead, ls[0].term
return ids[ls[0].lead], ls[0].term
}
time.Sleep(c.Node(0).e.tickDuration * defaultElection)
}
@ -424,7 +434,6 @@ func isSameLead(ls []leadterm) bool {
type testServer struct {
Config *conf.Config
Id int64
TLS bool
// base URL of form http://ipaddr:port with no trailing slash
@ -452,7 +461,6 @@ func (s *testServer) Start() {
panic(err)
}
s.e = e
e.setId(s.Id)
tick := time.Duration(c.Peer.HeartbeatInterval) * time.Millisecond
e.SetTick(tick)

View File

@ -87,7 +87,7 @@ type participant struct {
*http.ServeMux
}
func newParticipant(id int64, c *conf.Config, client *v2client, peerHub *peerHub, tickDuration time.Duration) (*participant, error) {
func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDuration time.Duration) (*participant, error) {
p := &participant{
clusterId: -1,
cfg: c,
@ -103,7 +103,7 @@ func newParticipant(id int64, c *conf.Config, client *v2client, peerHub *peerHub
result: make(map[wait]chan interface{}),
},
Store: store.New(),
serverStats: NewRaftServerStats(fmt.Sprint(id)),
serverStats: NewRaftServerStats(c.Name),
stopNotifyc: make(chan struct{}),
@ -119,7 +119,7 @@ func newParticipant(id int64, c *conf.Config, client *v2client, peerHub *peerHub
return nil, err
}
p.id = id
p.id = genId()
p.pubAddr = c.Addr
p.raftPubAddr = c.Peer.Addr
if w, err = wal.New(walPath); err != nil {

View File

@ -48,12 +48,12 @@ type peerHub struct {
serverStats *raftServerStats
}
func newPeerHub(id int64, c *http.Client) *peerHub {
func newPeerHub(c *http.Client, followersStats *raftFollowersStats) *peerHub {
h := &peerHub{
peers: make(map[int64]*peer),
seeds: make(map[string]bool),
c: c,
followersStats: NewRaftFollowersStats(fmt.Sprint(id)),
followersStats: followersStats,
}
return h
}

View File

@ -205,7 +205,7 @@ func TestGetAdminMachineEndPoint(t *testing.T) {
for i := 0; i < cl.Size; i++ {
for j := 0; j < cl.Size; j++ {
name := fmt.Sprint(cl.Node(i).Id)
name := fmt.Sprint(cl.Id(i))
r, err := http.Get(cl.URL(j) + v2adminMachinesPrefix + name)
if err != nil {
t.Errorf("%v", err)
@ -250,7 +250,7 @@ func TestGetAdminMachinesEndPoint(t *testing.T) {
w := make([]*machineMessage, cl.Size)
for i := 0; i < cl.Size; i++ {
w[i] = &machineMessage{
Name: fmt.Sprint(cl.Node(i).Id),
Name: fmt.Sprint(cl.Id(i)),
State: stateFollower,
ClientURL: cl.URL(i),
PeerURL: cl.URL(i),