From 273c293645234b54f8c0212b922a1e85b1538da5 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 20 Mar 2014 17:34:17 -0700 Subject: [PATCH] fix(server): rejoin cluster with different ip --- server/join_command.go | 42 +++++++++++++++++++- server/registry.go | 22 +++++++++++ tests/functional/rejoin_test.go | 69 +++++++++++++++++++++++++++++++++ 3 files changed, 131 insertions(+), 2 deletions(-) create mode 100644 tests/functional/rejoin_test.go diff --git a/server/join_command.go b/server/join_command.go index 0e7cf4d2e..4f83f4529 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -29,6 +29,17 @@ func (c *JoinCommandV1) CommandName() string { return "etcd:join" } +func (c *JoinCommandV1) updatePeerURL(ps *PeerServer) error { + log.Debugf("Update peer URL of %v to %v", c.Name, c.RaftURL) + if err := ps.registry.UpdatePeerURL(c.Name, c.RaftURL); err != nil { + log.Debugf("Error while updating in registry: %s (%v)", c.Name, err) + return err + } + // Flush commit index, so raft will replay to here when restarted + ps.raftServer.FlushCommitIndex() + return nil +} + // Join a server to the cluster func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) { ps, _ := context.Server().Context().(*PeerServer) @@ -40,7 +51,15 @@ func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) { ps.registry.Invalidate(c.Name) // Check if the join command is from a previous peer, who lost all its previous log. - if _, ok := ps.registry.ClientURL(c.Name); ok { + if peerURL, ok := ps.registry.PeerURL(c.Name); ok { + // If previous node restarts with different peer URL, + // update its information. + if peerURL != c.RaftURL { + log.Infof("Rejoin with %v instead of %v from %v", c.RaftURL, peerURL, c.Name) + if err := c.updatePeerURL(ps); err != nil { + return []byte{0}, err + } + } return b, nil } @@ -83,6 +102,17 @@ func (c *JoinCommandV2) CommandName() string { return "etcd:v2:join" } +func (c *JoinCommandV2) updatePeerURL(ps *PeerServer) error { + log.Debugf("Update peer URL of %v to %v", c.Name, c.PeerURL) + if err := ps.registry.UpdatePeerURL(c.Name, c.PeerURL); err != nil { + log.Debugf("Error while updating in registry: %s (%v)", c.Name, err) + return err + } + // Flush commit index, so raft will replay to here when restart + ps.raftServer.FlushCommitIndex() + return nil +} + // Apply attempts to join a machine to the cluster. func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) { ps, _ := context.Server().Context().(*PeerServer) @@ -95,7 +125,15 @@ func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) { ps.registry.Invalidate(c.Name) // Check if the join command is from a previous peer, who lost all its previous log. - if _, ok := ps.registry.ClientURL(c.Name); ok { + if peerURL, ok := ps.registry.PeerURL(c.Name); ok { + // If previous node restarts with different peer URL, + // update its information. + if peerURL != c.PeerURL { + log.Infof("Rejoin with %v instead of %v from %v", c.PeerURL, peerURL, c.Name) + if err := c.updatePeerURL(ps); err != nil { + return []byte{0}, err + } + } return json.Marshal(msg) } diff --git a/server/registry.go b/server/registry.go index 737c0cbb0..f461fa7e9 100644 --- a/server/registry.go +++ b/server/registry.go @@ -103,6 +103,25 @@ func (r *Registry) register(key, name string, peerURL string, machURL string) er return err } +// UpdatePeerURL updates peer URL in registry +func (r *Registry) UpdatePeerURL(name string, peerURL string) error { + r.Lock() + defer r.Unlock() + + machURL, _ := r.clientURL(RegistryPeerKey, name) + // Write data to store. + key := path.Join(RegistryPeerKey, name) + v := url.Values{} + v.Set("raft", peerURL) + v.Set("etcd", machURL) + _, err := r.store.Update(key, v.Encode(), store.Permanent) + + // Invalidate outdated cache. + r.invalidate(name) + log.Debugf("Update PeerURL: %s", name) + return err +} + // UnregisterPeer removes a peer from the registry. func (r *Registry) UnregisterPeer(name string) error { return r.unregister(RegistryPeerKey, name) @@ -290,7 +309,10 @@ func (r *Registry) urls(key, leaderName, selfName string, url func(key, name str func (r *Registry) Invalidate(name string) { r.Lock() defer r.Unlock() + r.invalidate(name) +} +func (r *Registry) invalidate(name string) { delete(r.peers, name) delete(r.standbys, name) } diff --git a/tests/functional/rejoin_test.go b/tests/functional/rejoin_test.go new file mode 100644 index 000000000..dc078eb6f --- /dev/null +++ b/tests/functional/rejoin_test.go @@ -0,0 +1,69 @@ +package test + +import ( + "fmt" + "math/rand" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" +) + +func increasePeerAddressPort(args []string, delta int) []string { + for i, arg := range args { + if !strings.Contains(arg, "peer-addr") { + continue + } + splitArg := strings.Split(arg, ":") + port, _ := strconv.Atoi(splitArg[len(splitArg)-1]) + args[i] = "-peer-addr=127.0.0.1:" + strconv.Itoa(port+delta) + return args + } + return append(args, "-peer-addr=127.0.0.1:"+strconv.Itoa(7001+delta)) +} + +// Create a five-node cluster +// Random kill one of the nodes and restart it with different peer address +func TestRejoinWithDifferentPeerAddress(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 5 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + + if err != nil { + t.Fatal("cannot create cluster") + } + + defer DestroyCluster(etcds) + + time.Sleep(2 * time.Second) + + for i := 0; i < 10; i++ { + num := rand.Int() % clusterSize + fmt.Println("kill node", num+1) + + // kill + etcds[num].Kill() + etcds[num].Release() + time.Sleep(time.Second) + + argGroup[num] = increasePeerAddressPort(argGroup[num], clusterSize) + // restart + etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr) + if err != nil { + panic(err) + } + time.Sleep(time.Second) + } + + c := etcd.NewClient(nil) + c.SyncCluster() + result, err := c.Set("foo", "bar", 0) + if err != nil || result.Node.Key != "/foo" || result.Node.Value != "bar" { + t.Fatal("Failed to set value in etcd cluster") + } +}