diff --git a/Documentation/clustering.md b/Documentation/clustering.md index 4bfc2cfd1..c6b16305d 100644 --- a/Documentation/clustering.md +++ b/Documentation/clustering.md @@ -103,6 +103,13 @@ curl -L http://127.0.0.1:4001/v2/keys/foo -XPUT -d value=bar } ``` +### Rejoining to the Cluster + +If one machine disconnects from the cluster, it could rejoin the cluster automatically when the communication is recovered. + +If one machine is killed, it could rejoin the cluster when started with old name. If the peer address is changed, etcd will treat the new peer address as the refreshed one, which benefits instance migration, or virtual machine boot with different IP. + +**Note:** For now, it is user responsibility to ensure that the machine doesn't join the cluster that has the member with the same name. Or unexpected error will happen. It would be improved sooner or later. ### Killing Nodes in the Cluster diff --git a/server/join_command.go b/server/join_command.go index 0e7cf4d2e..4f83f4529 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -29,6 +29,17 @@ func (c *JoinCommandV1) CommandName() string { return "etcd:join" } +func (c *JoinCommandV1) updatePeerURL(ps *PeerServer) error { + log.Debugf("Update peer URL of %v to %v", c.Name, c.RaftURL) + if err := ps.registry.UpdatePeerURL(c.Name, c.RaftURL); err != nil { + log.Debugf("Error while updating in registry: %s (%v)", c.Name, err) + return err + } + // Flush commit index, so raft will replay to here when restarted + ps.raftServer.FlushCommitIndex() + return nil +} + // Join a server to the cluster func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) { ps, _ := context.Server().Context().(*PeerServer) @@ -40,7 +51,15 @@ func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) { ps.registry.Invalidate(c.Name) // Check if the join command is from a previous peer, who lost all its previous log. - if _, ok := ps.registry.ClientURL(c.Name); ok { + if peerURL, ok := ps.registry.PeerURL(c.Name); ok { + // If previous node restarts with different peer URL, + // update its information. + if peerURL != c.RaftURL { + log.Infof("Rejoin with %v instead of %v from %v", c.RaftURL, peerURL, c.Name) + if err := c.updatePeerURL(ps); err != nil { + return []byte{0}, err + } + } return b, nil } @@ -83,6 +102,17 @@ func (c *JoinCommandV2) CommandName() string { return "etcd:v2:join" } +func (c *JoinCommandV2) updatePeerURL(ps *PeerServer) error { + log.Debugf("Update peer URL of %v to %v", c.Name, c.PeerURL) + if err := ps.registry.UpdatePeerURL(c.Name, c.PeerURL); err != nil { + log.Debugf("Error while updating in registry: %s (%v)", c.Name, err) + return err + } + // Flush commit index, so raft will replay to here when restart + ps.raftServer.FlushCommitIndex() + return nil +} + // Apply attempts to join a machine to the cluster. func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) { ps, _ := context.Server().Context().(*PeerServer) @@ -95,7 +125,15 @@ func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) { ps.registry.Invalidate(c.Name) // Check if the join command is from a previous peer, who lost all its previous log. - if _, ok := ps.registry.ClientURL(c.Name); ok { + if peerURL, ok := ps.registry.PeerURL(c.Name); ok { + // If previous node restarts with different peer URL, + // update its information. + if peerURL != c.PeerURL { + log.Infof("Rejoin with %v instead of %v from %v", c.PeerURL, peerURL, c.Name) + if err := c.updatePeerURL(ps); err != nil { + return []byte{0}, err + } + } return json.Marshal(msg) } diff --git a/server/registry.go b/server/registry.go index 737c0cbb0..f461fa7e9 100644 --- a/server/registry.go +++ b/server/registry.go @@ -103,6 +103,25 @@ func (r *Registry) register(key, name string, peerURL string, machURL string) er return err } +// UpdatePeerURL updates peer URL in registry +func (r *Registry) UpdatePeerURL(name string, peerURL string) error { + r.Lock() + defer r.Unlock() + + machURL, _ := r.clientURL(RegistryPeerKey, name) + // Write data to store. + key := path.Join(RegistryPeerKey, name) + v := url.Values{} + v.Set("raft", peerURL) + v.Set("etcd", machURL) + _, err := r.store.Update(key, v.Encode(), store.Permanent) + + // Invalidate outdated cache. + r.invalidate(name) + log.Debugf("Update PeerURL: %s", name) + return err +} + // UnregisterPeer removes a peer from the registry. func (r *Registry) UnregisterPeer(name string) error { return r.unregister(RegistryPeerKey, name) @@ -290,7 +309,10 @@ func (r *Registry) urls(key, leaderName, selfName string, url func(key, name str func (r *Registry) Invalidate(name string) { r.Lock() defer r.Unlock() + r.invalidate(name) +} +func (r *Registry) invalidate(name string) { delete(r.peers, name) delete(r.standbys, name) } diff --git a/tests/functional/rejoin_test.go b/tests/functional/rejoin_test.go new file mode 100644 index 000000000..c00e360ab --- /dev/null +++ b/tests/functional/rejoin_test.go @@ -0,0 +1,148 @@ +package test + +import ( + "fmt" + "math/rand" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" +) + +func increasePeerAddressPort(args []string, delta int) []string { + for i, arg := range args { + if !strings.Contains(arg, "peer-addr") { + continue + } + splitArg := strings.Split(arg, ":") + port, _ := strconv.Atoi(splitArg[len(splitArg)-1]) + args[i] = "-peer-addr=127.0.0.1:" + strconv.Itoa(port+delta) + return args + } + return append(args, "-peer-addr=127.0.0.1:"+strconv.Itoa(7001+delta)) +} + +func increaseAddressPort(args []string, delta int) []string { + for i, arg := range args { + if !strings.HasPrefix(arg, "-addr") && !strings.HasPrefix(arg, "--addr") { + continue + } + splitArg := strings.Split(arg, ":") + port, _ := strconv.Atoi(splitArg[len(splitArg)-1]) + args[i] = "-addr=127.0.0.1:" + strconv.Itoa(port+delta) + return args + } + return append(args, "-addr=127.0.0.1:"+strconv.Itoa(4001+delta)) +} + +func increaseDataDir(args []string, delta int) []string { + for i, arg := range args { + if !strings.Contains(arg, "-data-dir") { + continue + } + splitArg := strings.Split(arg, "node") + idx, _ := strconv.Atoi(splitArg[len(splitArg)-1]) + args[i] = "-data-dir=/tmp/node" + strconv.Itoa(idx+delta) + return args + } + return args +} + +// Create a five-node cluster +// Random kill one of the nodes and restart it with different peer address +func TestRejoinWithDifferentPeerAddress(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 5 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + + if err != nil { + t.Fatal("cannot create cluster") + } + + defer DestroyCluster(etcds) + + time.Sleep(2 * time.Second) + + for i := 0; i < 10; i++ { + num := rand.Int() % clusterSize + fmt.Println("kill node", num+1) + + etcds[num].Kill() + etcds[num].Release() + time.Sleep(time.Second) + + argGroup[num] = increasePeerAddressPort(argGroup[num], clusterSize) + // restart + etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr) + if err != nil { + panic(err) + } + time.Sleep(time.Second) + } + + c := etcd.NewClient(nil) + c.SyncCluster() + result, err := c.Set("foo", "bar", 0) + if err != nil || result.Node.Key != "/foo" || result.Node.Value != "bar" { + t.Fatal("Failed to set value in etcd cluster") + } +} + +// Create a five-node cluster +// Replace one of the nodes with different peer address +func TestReplaceWithDifferentPeerAddress(t *testing.T) { + // TODO(yichengq): find some way to avoid the error that will be + // caused if some node joins the cluster with the collided name. + // Possible solutions: + // 1. Remove itself when executing a join command with the same name + // and different peer address. However, it should find some way to + // trigger that execution because the leader may update its address + // and stop heartbeat. + // 2. Remove the node with the same name before join each time. + // But this way could be rather overkill. + t.Skip("Unimplemented functionality") + + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 5 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + + if err != nil { + t.Fatal("cannot create cluster") + } + + defer DestroyCluster(etcds) + + time.Sleep(2 * time.Second) + + rand.Int() + for i := 0; i < 10; i++ { + num := rand.Int() % clusterSize + fmt.Println("replace node", num+1) + + argGroup[num] = increasePeerAddressPort(argGroup[num], clusterSize) + argGroup[num] = increaseAddressPort(argGroup[num], clusterSize) + argGroup[num] = increaseDataDir(argGroup[num], clusterSize) + // restart + newEtcd, err := os.StartProcess(EtcdBinPath, append(argGroup[num], "-f"), procAttr) + if err != nil { + panic(err) + } + + etcds[num].Wait() + etcds[num] = newEtcd + } + + c := etcd.NewClient(nil) + c.SyncCluster() + result, err := c.Set("foo", "bar", 0) + if err != nil || result.Node.Key != "/foo" || result.Node.Value != "bar" { + t.Fatal("Failed to set value in etcd cluster") + } +}