From 273c293645234b54f8c0212b922a1e85b1538da5 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 20 Mar 2014 17:34:17 -0700 Subject: [PATCH 1/3] fix(server): rejoin cluster with different ip --- server/join_command.go | 42 +++++++++++++++++++- server/registry.go | 22 +++++++++++ tests/functional/rejoin_test.go | 69 +++++++++++++++++++++++++++++++++ 3 files changed, 131 insertions(+), 2 deletions(-) create mode 100644 tests/functional/rejoin_test.go diff --git a/server/join_command.go b/server/join_command.go index 0e7cf4d2e..4f83f4529 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -29,6 +29,17 @@ func (c *JoinCommandV1) CommandName() string { return "etcd:join" } +func (c *JoinCommandV1) updatePeerURL(ps *PeerServer) error { + log.Debugf("Update peer URL of %v to %v", c.Name, c.RaftURL) + if err := ps.registry.UpdatePeerURL(c.Name, c.RaftURL); err != nil { + log.Debugf("Error while updating in registry: %s (%v)", c.Name, err) + return err + } + // Flush commit index, so raft will replay to here when restarted + ps.raftServer.FlushCommitIndex() + return nil +} + // Join a server to the cluster func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) { ps, _ := context.Server().Context().(*PeerServer) @@ -40,7 +51,15 @@ func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) { ps.registry.Invalidate(c.Name) // Check if the join command is from a previous peer, who lost all its previous log. - if _, ok := ps.registry.ClientURL(c.Name); ok { + if peerURL, ok := ps.registry.PeerURL(c.Name); ok { + // If previous node restarts with different peer URL, + // update its information. + if peerURL != c.RaftURL { + log.Infof("Rejoin with %v instead of %v from %v", c.RaftURL, peerURL, c.Name) + if err := c.updatePeerURL(ps); err != nil { + return []byte{0}, err + } + } return b, nil } @@ -83,6 +102,17 @@ func (c *JoinCommandV2) CommandName() string { return "etcd:v2:join" } +func (c *JoinCommandV2) updatePeerURL(ps *PeerServer) error { + log.Debugf("Update peer URL of %v to %v", c.Name, c.PeerURL) + if err := ps.registry.UpdatePeerURL(c.Name, c.PeerURL); err != nil { + log.Debugf("Error while updating in registry: %s (%v)", c.Name, err) + return err + } + // Flush commit index, so raft will replay to here when restart + ps.raftServer.FlushCommitIndex() + return nil +} + // Apply attempts to join a machine to the cluster. func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) { ps, _ := context.Server().Context().(*PeerServer) @@ -95,7 +125,15 @@ func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) { ps.registry.Invalidate(c.Name) // Check if the join command is from a previous peer, who lost all its previous log. - if _, ok := ps.registry.ClientURL(c.Name); ok { + if peerURL, ok := ps.registry.PeerURL(c.Name); ok { + // If previous node restarts with different peer URL, + // update its information. + if peerURL != c.PeerURL { + log.Infof("Rejoin with %v instead of %v from %v", c.PeerURL, peerURL, c.Name) + if err := c.updatePeerURL(ps); err != nil { + return []byte{0}, err + } + } return json.Marshal(msg) } diff --git a/server/registry.go b/server/registry.go index 737c0cbb0..f461fa7e9 100644 --- a/server/registry.go +++ b/server/registry.go @@ -103,6 +103,25 @@ func (r *Registry) register(key, name string, peerURL string, machURL string) er return err } +// UpdatePeerURL updates peer URL in registry +func (r *Registry) UpdatePeerURL(name string, peerURL string) error { + r.Lock() + defer r.Unlock() + + machURL, _ := r.clientURL(RegistryPeerKey, name) + // Write data to store. + key := path.Join(RegistryPeerKey, name) + v := url.Values{} + v.Set("raft", peerURL) + v.Set("etcd", machURL) + _, err := r.store.Update(key, v.Encode(), store.Permanent) + + // Invalidate outdated cache. + r.invalidate(name) + log.Debugf("Update PeerURL: %s", name) + return err +} + // UnregisterPeer removes a peer from the registry. func (r *Registry) UnregisterPeer(name string) error { return r.unregister(RegistryPeerKey, name) @@ -290,7 +309,10 @@ func (r *Registry) urls(key, leaderName, selfName string, url func(key, name str func (r *Registry) Invalidate(name string) { r.Lock() defer r.Unlock() + r.invalidate(name) +} +func (r *Registry) invalidate(name string) { delete(r.peers, name) delete(r.standbys, name) } diff --git a/tests/functional/rejoin_test.go b/tests/functional/rejoin_test.go new file mode 100644 index 000000000..dc078eb6f --- /dev/null +++ b/tests/functional/rejoin_test.go @@ -0,0 +1,69 @@ +package test + +import ( + "fmt" + "math/rand" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" +) + +func increasePeerAddressPort(args []string, delta int) []string { + for i, arg := range args { + if !strings.Contains(arg, "peer-addr") { + continue + } + splitArg := strings.Split(arg, ":") + port, _ := strconv.Atoi(splitArg[len(splitArg)-1]) + args[i] = "-peer-addr=127.0.0.1:" + strconv.Itoa(port+delta) + return args + } + return append(args, "-peer-addr=127.0.0.1:"+strconv.Itoa(7001+delta)) +} + +// Create a five-node cluster +// Random kill one of the nodes and restart it with different peer address +func TestRejoinWithDifferentPeerAddress(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 5 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + + if err != nil { + t.Fatal("cannot create cluster") + } + + defer DestroyCluster(etcds) + + time.Sleep(2 * time.Second) + + for i := 0; i < 10; i++ { + num := rand.Int() % clusterSize + fmt.Println("kill node", num+1) + + // kill + etcds[num].Kill() + etcds[num].Release() + time.Sleep(time.Second) + + argGroup[num] = increasePeerAddressPort(argGroup[num], clusterSize) + // restart + etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr) + if err != nil { + panic(err) + } + time.Sleep(time.Second) + } + + c := etcd.NewClient(nil) + c.SyncCluster() + result, err := c.Set("foo", "bar", 0) + if err != nil || result.Node.Key != "/foo" || result.Node.Value != "bar" { + t.Fatal("Failed to set value in etcd cluster") + } +} From 6988676f43758c3ee8f42433f21ab73ec1a25e5d Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 17 Apr 2014 10:16:53 -0700 Subject: [PATCH 2/3] docs(clustering): docs about rejoin --- Documentation/clustering.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Documentation/clustering.md b/Documentation/clustering.md index 4bfc2cfd1..c6b16305d 100644 --- a/Documentation/clustering.md +++ b/Documentation/clustering.md @@ -103,6 +103,13 @@ curl -L http://127.0.0.1:4001/v2/keys/foo -XPUT -d value=bar } ``` +### Rejoining to the Cluster + +If one machine disconnects from the cluster, it could rejoin the cluster automatically when the communication is recovered. + +If one machine is killed, it could rejoin the cluster when started with old name. If the peer address is changed, etcd will treat the new peer address as the refreshed one, which benefits instance migration, or virtual machine boot with different IP. + +**Note:** For now, it is user responsibility to ensure that the machine doesn't join the cluster that has the member with the same name. Or unexpected error will happen. It would be improved sooner or later. ### Killing Nodes in the Cluster From 732fb7c1604406151c12ca1f223c16347c698744 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 17 Apr 2014 10:17:26 -0700 Subject: [PATCH 3/3] tests(rejoin): add TestReplaceWithDifferentPeerAddress The functionality has not been implemented yet. --- tests/functional/rejoin_test.go | 81 ++++++++++++++++++++++++++++++++- 1 file changed, 80 insertions(+), 1 deletion(-) diff --git a/tests/functional/rejoin_test.go b/tests/functional/rejoin_test.go index dc078eb6f..c00e360ab 100644 --- a/tests/functional/rejoin_test.go +++ b/tests/functional/rejoin_test.go @@ -25,6 +25,32 @@ func increasePeerAddressPort(args []string, delta int) []string { return append(args, "-peer-addr=127.0.0.1:"+strconv.Itoa(7001+delta)) } +func increaseAddressPort(args []string, delta int) []string { + for i, arg := range args { + if !strings.HasPrefix(arg, "-addr") && !strings.HasPrefix(arg, "--addr") { + continue + } + splitArg := strings.Split(arg, ":") + port, _ := strconv.Atoi(splitArg[len(splitArg)-1]) + args[i] = "-addr=127.0.0.1:" + strconv.Itoa(port+delta) + return args + } + return append(args, "-addr=127.0.0.1:"+strconv.Itoa(4001+delta)) +} + +func increaseDataDir(args []string, delta int) []string { + for i, arg := range args { + if !strings.Contains(arg, "-data-dir") { + continue + } + splitArg := strings.Split(arg, "node") + idx, _ := strconv.Atoi(splitArg[len(splitArg)-1]) + args[i] = "-data-dir=/tmp/node" + strconv.Itoa(idx+delta) + return args + } + return args +} + // Create a five-node cluster // Random kill one of the nodes and restart it with different peer address func TestRejoinWithDifferentPeerAddress(t *testing.T) { @@ -46,7 +72,6 @@ func TestRejoinWithDifferentPeerAddress(t *testing.T) { num := rand.Int() % clusterSize fmt.Println("kill node", num+1) - // kill etcds[num].Kill() etcds[num].Release() time.Sleep(time.Second) @@ -67,3 +92,57 @@ func TestRejoinWithDifferentPeerAddress(t *testing.T) { t.Fatal("Failed to set value in etcd cluster") } } + +// Create a five-node cluster +// Replace one of the nodes with different peer address +func TestReplaceWithDifferentPeerAddress(t *testing.T) { + // TODO(yichengq): find some way to avoid the error that will be + // caused if some node joins the cluster with the collided name. + // Possible solutions: + // 1. Remove itself when executing a join command with the same name + // and different peer address. However, it should find some way to + // trigger that execution because the leader may update its address + // and stop heartbeat. + // 2. Remove the node with the same name before join each time. + // But this way could be rather overkill. + t.Skip("Unimplemented functionality") + + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 5 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + + if err != nil { + t.Fatal("cannot create cluster") + } + + defer DestroyCluster(etcds) + + time.Sleep(2 * time.Second) + + rand.Int() + for i := 0; i < 10; i++ { + num := rand.Int() % clusterSize + fmt.Println("replace node", num+1) + + argGroup[num] = increasePeerAddressPort(argGroup[num], clusterSize) + argGroup[num] = increaseAddressPort(argGroup[num], clusterSize) + argGroup[num] = increaseDataDir(argGroup[num], clusterSize) + // restart + newEtcd, err := os.StartProcess(EtcdBinPath, append(argGroup[num], "-f"), procAttr) + if err != nil { + panic(err) + } + + etcds[num].Wait() + etcds[num] = newEtcd + } + + c := etcd.NewClient(nil) + c.SyncCluster() + result, err := c.Set("foo", "bar", 0) + if err != nil || result.Node.Key != "/foo" || result.Node.Value != "bar" { + t.Fatal("Failed to set value in etcd cluster") + } +}