From 0c95e1eabb3f97fe1205f99937b8543d2bb9474a Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 17 Apr 2014 15:36:51 -0700 Subject: [PATCH 1/3] feat(peer_server): forbid rejoining with different name Or it will confuse the cluster, especially the heartbeat between nodes. --- error/error.go | 38 +++++++++++++++------------- server/join_command.go | 18 +++++++++++++ server/peer_server.go | 28 ++++++++++++++++++++ server/registry.go | 3 +-- tests/functional/rejoin_test.go | 45 +++++++++++++++++++++++++++++++++ tests/functional/util.go | 3 +++ 6 files changed, 115 insertions(+), 20 deletions(-) diff --git a/error/error.go b/error/error.go index 4eb8da951..39791be73 100644 --- a/error/error.go +++ b/error/error.go @@ -24,15 +24,16 @@ import ( var errors = map[int]string{ // command related errors - EcodeKeyNotFound: "Key not found", - EcodeTestFailed: "Compare failed", //test and set - EcodeNotFile: "Not a file", - EcodeNoMorePeer: "Reached the max number of peers in the cluster", - EcodeNotDir: "Not a directory", - EcodeNodeExist: "Key already exists", // create - EcodeRootROnly: "Root is read only", - EcodeKeyIsPreserved: "The prefix of given key is a keyword in etcd", - EcodeDirNotEmpty: "Directory not empty", + EcodeKeyNotFound: "Key not found", + EcodeTestFailed: "Compare failed", //test and set + EcodeNotFile: "Not a file", + EcodeNoMorePeer: "Reached the max number of peers in the cluster", + EcodeNotDir: "Not a directory", + EcodeNodeExist: "Key already exists", // create + EcodeRootROnly: "Root is read only", + EcodeKeyIsPreserved: "The prefix of given key is a keyword in etcd", + EcodeDirNotEmpty: "Directory not empty", + EcodeExistingPeerAddr: "Peer address has existed", // Post form related errors EcodeValueRequired: "Value is Required in POST form", @@ -60,15 +61,16 @@ var errors = map[int]string{ } const ( - EcodeKeyNotFound = 100 - EcodeTestFailed = 101 - EcodeNotFile = 102 - EcodeNoMorePeer = 103 - EcodeNotDir = 104 - EcodeNodeExist = 105 - EcodeKeyIsPreserved = 106 - EcodeRootROnly = 107 - EcodeDirNotEmpty = 108 + EcodeKeyNotFound = 100 + EcodeTestFailed = 101 + EcodeNotFile = 102 + EcodeNoMorePeer = 103 + EcodeNotDir = 104 + EcodeNodeExist = 105 + EcodeKeyIsPreserved = 106 + EcodeRootROnly = 107 + EcodeDirNotEmpty = 108 + EcodeExistingPeerAddr = 109 EcodeValueRequired = 200 EcodePrevValueRequired = 201 diff --git a/server/join_command.go b/server/join_command.go index 4f83f4529..cfbd330a9 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -63,6 +63,15 @@ func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) { return b, nil } + // Check if the join command adds an instance that collides with existing one on peer URL. + peerURLs := ps.registry.PeerURLs(ps.raftServer.Leader(), c.Name) + for _, peerURL := range peerURLs { + if peerURL == c.EtcdURL { + log.Warnf("%v tries to join the cluster with existing URL %v", c.Name, c.EtcdURL) + return []byte{0}, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.EtcdURL, context.CommitIndex()) + } + } + // Check peer number in the cluster if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize { log.Debug("Reject join request from ", c.Name) @@ -137,6 +146,15 @@ func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) { return json.Marshal(msg) } + // Check if the join command adds an instance that collides with existing one on peer URL. + peerURLs := ps.registry.PeerURLs(ps.raftServer.Leader(), c.Name) + for _, peerURL := range peerURLs { + if peerURL == c.PeerURL { + log.Warnf("%v tries to join the cluster with existing URL %v", c.Name, c.PeerURL) + return []byte{0}, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.PeerURL, context.CommitIndex()) + } + } + // Check peer number in the cluster. if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize { log.Debug("Join as standby ", c.Name) diff --git a/server/peer_server.go b/server/peer_server.go index dcf7cc575..bb096b2fd 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -10,6 +10,7 @@ import ( "net/url" "sort" "strconv" + "strings" "sync" "time" @@ -187,6 +188,12 @@ func (s *PeerServer) findCluster(discoverURL string, peers []string) { // Try its best to find possible peers, and connect with them. if !isNewNode { + // It is not allowed to join the cluster with existing peer address + // This prevents old node joining with different name by mistake. + if !s.checkPeerAddressNonconflict() { + log.Fatalf("%v is not allowed to join the cluster with existing URL %v", s.Config.Name, s.Config.URL) + } + // Take old nodes into account. allPeers := s.getKnownPeers() // Discover registered peers. @@ -426,6 +433,25 @@ func (s *PeerServer) Upgradable() error { return nil } +// checkPeerAddressNonconflict checks whether the peer address has existed with different name. +func (s *PeerServer) checkPeerAddressNonconflict() bool { + // there exists the (name, peer address) pair + if peerURL, ok := s.registry.PeerURL(s.Config.Name); ok { + if peerURL == s.Config.URL { + return true + } + } + + // check all existing peer addresses + peerURLs := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) + for _, peerURL := range peerURLs { + if peerURL == s.Config.URL { + return false + } + } + return true +} + // Helper function to do discovery and return results in expected format func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) { peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL) @@ -455,6 +481,8 @@ func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err er // getKnownPeers gets the previous peers from log func (s *PeerServer) getKnownPeers() []string { peers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) + log.Infof("Peer URLs in log: %s / %s (%s)", s.raftServer.Leader(), s.Config.Name, strings.Join(peers, ",")) + for i := range peers { u, err := url.Parse(peers[i]) if err != nil { diff --git a/server/registry.go b/server/registry.go index f461fa7e9..813ddef77 100644 --- a/server/registry.go +++ b/server/registry.go @@ -300,8 +300,7 @@ func (r *Registry) urls(key, leaderName, selfName string, url func(key, name str } } - log.Infof("URLs: %s: %s / %s (%s)", key, leaderName, selfName, strings.Join(urls, ",")) - + log.Debugf("URLs: %s: %s / %s (%s)", key, leaderName, selfName, strings.Join(urls, ",")) return urls } diff --git a/tests/functional/rejoin_test.go b/tests/functional/rejoin_test.go index c00e360ab..1cecabece 100644 --- a/tests/functional/rejoin_test.go +++ b/tests/functional/rejoin_test.go @@ -146,3 +146,48 @@ func TestReplaceWithDifferentPeerAddress(t *testing.T) { t.Fatal("Failed to set value in etcd cluster") } } + +// Create a five nodes +// Let the sixth instance join with different name and existing peer address +func TestRejoinWithDifferentName(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 5 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + + if err != nil { + t.Fatal("cannot create cluster") + } + + defer DestroyCluster(etcds) + + time.Sleep(2 * time.Second) + + num := rand.Int() % clusterSize + fmt.Println("join node 6 that collides with node", num+1) + + // kill + etcds[num].Kill() + etcds[num].Release() + time.Sleep(time.Second) + + for i := 0; i < 2; i++ { + // restart + if i == 0 { + etcds[num], err = os.StartProcess(EtcdBinPath, append(argGroup[num], "-name=node6", "-peers=127.0.0.1:7002"), procAttr) + } else { + etcds[num], err = os.StartProcess(EtcdBinPath, append(argGroup[num], "-f", "-name=node6", "-peers=127.0.0.1:7002"), procAttr) + } + if err != nil { + t.Fatal("fail starting etcd:", err) + } + + timer := time.AfterFunc(10*time.Second, func() { + t.Fatal("new etcd should fail immediately") + }) + etcds[num].Wait() + etcds[num] = nil + timer.Stop() + } +} diff --git a/tests/functional/util.go b/tests/functional/util.go index 57eaadcce..f208bba89 100644 --- a/tests/functional/util.go +++ b/tests/functional/util.go @@ -154,6 +154,9 @@ func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os // Destroy all the nodes in the cluster func DestroyCluster(etcds []*os.Process) error { for _, etcd := range etcds { + if etcd == nil { + continue + } err := etcd.Kill() if err != nil { panic(err.Error()) From e0fbe27c99f24e6f02e9e7e347ee51f5b76d2e7a Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 18 Apr 2014 10:28:24 -0700 Subject: [PATCH 2/3] fix(join_command): use RaftURL as peer address --- server/join_command.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/join_command.go b/server/join_command.go index cfbd330a9..4bd1b2148 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -66,7 +66,7 @@ func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) { // Check if the join command adds an instance that collides with existing one on peer URL. peerURLs := ps.registry.PeerURLs(ps.raftServer.Leader(), c.Name) for _, peerURL := range peerURLs { - if peerURL == c.EtcdURL { + if peerURL == c.RaftURL { log.Warnf("%v tries to join the cluster with existing URL %v", c.Name, c.EtcdURL) return []byte{0}, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.EtcdURL, context.CommitIndex()) } From 000e3ba6510c8661090357f7a53d9c6f10847c24 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 18 Apr 2014 10:48:14 -0700 Subject: [PATCH 3/3] chore(rejoin_test): rewrite some printout --- tests/functional/rejoin_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/functional/rejoin_test.go b/tests/functional/rejoin_test.go index 1cecabece..d0b1e1f54 100644 --- a/tests/functional/rejoin_test.go +++ b/tests/functional/rejoin_test.go @@ -147,7 +147,7 @@ func TestReplaceWithDifferentPeerAddress(t *testing.T) { } } -// Create a five nodes +// Create a five-node cluster // Let the sixth instance join with different name and existing peer address func TestRejoinWithDifferentName(t *testing.T) { procAttr := new(os.ProcAttr) @@ -180,7 +180,7 @@ func TestRejoinWithDifferentName(t *testing.T) { etcds[num], err = os.StartProcess(EtcdBinPath, append(argGroup[num], "-f", "-name=node6", "-peers=127.0.0.1:7002"), procAttr) } if err != nil { - t.Fatal("fail starting etcd:", err) + t.Fatal("failed to start process:", err) } timer := time.AfterFunc(10*time.Second, func() {