Merge pull request #643 from unihorn/45

fix(server): rejoin cluster with different ip
This commit is contained in:
Yicheng Qin 2014-04-17 10:24:04 -07:00
commit cc329bfa55
4 changed files with 217 additions and 2 deletions

View File

@ -103,6 +103,13 @@ curl -L http://127.0.0.1:4001/v2/keys/foo -XPUT -d value=bar
}
```
### Rejoining to the Cluster
If one machine disconnects from the cluster, it could rejoin the cluster automatically when the communication is recovered.
If one machine is killed, it could rejoin the cluster when started with old name. If the peer address is changed, etcd will treat the new peer address as the refreshed one, which benefits instance migration, or virtual machine boot with different IP.
**Note:** For now, it is user responsibility to ensure that the machine doesn't join the cluster that has the member with the same name. Or unexpected error will happen. It would be improved sooner or later.
### Killing Nodes in the Cluster

View File

@ -29,6 +29,17 @@ func (c *JoinCommandV1) CommandName() string {
return "etcd:join"
}
func (c *JoinCommandV1) updatePeerURL(ps *PeerServer) error {
log.Debugf("Update peer URL of %v to %v", c.Name, c.RaftURL)
if err := ps.registry.UpdatePeerURL(c.Name, c.RaftURL); err != nil {
log.Debugf("Error while updating in registry: %s (%v)", c.Name, err)
return err
}
// Flush commit index, so raft will replay to here when restarted
ps.raftServer.FlushCommitIndex()
return nil
}
// Join a server to the cluster
func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) {
ps, _ := context.Server().Context().(*PeerServer)
@ -40,7 +51,15 @@ func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) {
ps.registry.Invalidate(c.Name)
// Check if the join command is from a previous peer, who lost all its previous log.
if _, ok := ps.registry.ClientURL(c.Name); ok {
if peerURL, ok := ps.registry.PeerURL(c.Name); ok {
// If previous node restarts with different peer URL,
// update its information.
if peerURL != c.RaftURL {
log.Infof("Rejoin with %v instead of %v from %v", c.RaftURL, peerURL, c.Name)
if err := c.updatePeerURL(ps); err != nil {
return []byte{0}, err
}
}
return b, nil
}
@ -83,6 +102,17 @@ func (c *JoinCommandV2) CommandName() string {
return "etcd:v2:join"
}
func (c *JoinCommandV2) updatePeerURL(ps *PeerServer) error {
log.Debugf("Update peer URL of %v to %v", c.Name, c.PeerURL)
if err := ps.registry.UpdatePeerURL(c.Name, c.PeerURL); err != nil {
log.Debugf("Error while updating in registry: %s (%v)", c.Name, err)
return err
}
// Flush commit index, so raft will replay to here when restart
ps.raftServer.FlushCommitIndex()
return nil
}
// Apply attempts to join a machine to the cluster.
func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) {
ps, _ := context.Server().Context().(*PeerServer)
@ -95,7 +125,15 @@ func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) {
ps.registry.Invalidate(c.Name)
// Check if the join command is from a previous peer, who lost all its previous log.
if _, ok := ps.registry.ClientURL(c.Name); ok {
if peerURL, ok := ps.registry.PeerURL(c.Name); ok {
// If previous node restarts with different peer URL,
// update its information.
if peerURL != c.PeerURL {
log.Infof("Rejoin with %v instead of %v from %v", c.PeerURL, peerURL, c.Name)
if err := c.updatePeerURL(ps); err != nil {
return []byte{0}, err
}
}
return json.Marshal(msg)
}

View File

@ -103,6 +103,25 @@ func (r *Registry) register(key, name string, peerURL string, machURL string) er
return err
}
// UpdatePeerURL updates peer URL in registry
func (r *Registry) UpdatePeerURL(name string, peerURL string) error {
r.Lock()
defer r.Unlock()
machURL, _ := r.clientURL(RegistryPeerKey, name)
// Write data to store.
key := path.Join(RegistryPeerKey, name)
v := url.Values{}
v.Set("raft", peerURL)
v.Set("etcd", machURL)
_, err := r.store.Update(key, v.Encode(), store.Permanent)
// Invalidate outdated cache.
r.invalidate(name)
log.Debugf("Update PeerURL: %s", name)
return err
}
// UnregisterPeer removes a peer from the registry.
func (r *Registry) UnregisterPeer(name string) error {
return r.unregister(RegistryPeerKey, name)
@ -290,7 +309,10 @@ func (r *Registry) urls(key, leaderName, selfName string, url func(key, name str
func (r *Registry) Invalidate(name string) {
r.Lock()
defer r.Unlock()
r.invalidate(name)
}
func (r *Registry) invalidate(name string) {
delete(r.peers, name)
delete(r.standbys, name)
}

View File

@ -0,0 +1,148 @@
package test
import (
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"testing"
"time"
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
)
func increasePeerAddressPort(args []string, delta int) []string {
for i, arg := range args {
if !strings.Contains(arg, "peer-addr") {
continue
}
splitArg := strings.Split(arg, ":")
port, _ := strconv.Atoi(splitArg[len(splitArg)-1])
args[i] = "-peer-addr=127.0.0.1:" + strconv.Itoa(port+delta)
return args
}
return append(args, "-peer-addr=127.0.0.1:"+strconv.Itoa(7001+delta))
}
func increaseAddressPort(args []string, delta int) []string {
for i, arg := range args {
if !strings.HasPrefix(arg, "-addr") && !strings.HasPrefix(arg, "--addr") {
continue
}
splitArg := strings.Split(arg, ":")
port, _ := strconv.Atoi(splitArg[len(splitArg)-1])
args[i] = "-addr=127.0.0.1:" + strconv.Itoa(port+delta)
return args
}
return append(args, "-addr=127.0.0.1:"+strconv.Itoa(4001+delta))
}
func increaseDataDir(args []string, delta int) []string {
for i, arg := range args {
if !strings.Contains(arg, "-data-dir") {
continue
}
splitArg := strings.Split(arg, "node")
idx, _ := strconv.Atoi(splitArg[len(splitArg)-1])
args[i] = "-data-dir=/tmp/node" + strconv.Itoa(idx+delta)
return args
}
return args
}
// Create a five-node cluster
// Random kill one of the nodes and restart it with different peer address
func TestRejoinWithDifferentPeerAddress(t *testing.T) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
clusterSize := 5
argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
if err != nil {
t.Fatal("cannot create cluster")
}
defer DestroyCluster(etcds)
time.Sleep(2 * time.Second)
for i := 0; i < 10; i++ {
num := rand.Int() % clusterSize
fmt.Println("kill node", num+1)
etcds[num].Kill()
etcds[num].Release()
time.Sleep(time.Second)
argGroup[num] = increasePeerAddressPort(argGroup[num], clusterSize)
// restart
etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr)
if err != nil {
panic(err)
}
time.Sleep(time.Second)
}
c := etcd.NewClient(nil)
c.SyncCluster()
result, err := c.Set("foo", "bar", 0)
if err != nil || result.Node.Key != "/foo" || result.Node.Value != "bar" {
t.Fatal("Failed to set value in etcd cluster")
}
}
// Create a five-node cluster
// Replace one of the nodes with different peer address
func TestReplaceWithDifferentPeerAddress(t *testing.T) {
// TODO(yichengq): find some way to avoid the error that will be
// caused if some node joins the cluster with the collided name.
// Possible solutions:
// 1. Remove itself when executing a join command with the same name
// and different peer address. However, it should find some way to
// trigger that execution because the leader may update its address
// and stop heartbeat.
// 2. Remove the node with the same name before join each time.
// But this way could be rather overkill.
t.Skip("Unimplemented functionality")
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
clusterSize := 5
argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
if err != nil {
t.Fatal("cannot create cluster")
}
defer DestroyCluster(etcds)
time.Sleep(2 * time.Second)
rand.Int()
for i := 0; i < 10; i++ {
num := rand.Int() % clusterSize
fmt.Println("replace node", num+1)
argGroup[num] = increasePeerAddressPort(argGroup[num], clusterSize)
argGroup[num] = increaseAddressPort(argGroup[num], clusterSize)
argGroup[num] = increaseDataDir(argGroup[num], clusterSize)
// restart
newEtcd, err := os.StartProcess(EtcdBinPath, append(argGroup[num], "-f"), procAttr)
if err != nil {
panic(err)
}
etcds[num].Wait()
etcds[num] = newEtcd
}
c := etcd.NewClient(nil)
c.SyncCluster()
result, err := c.Set("foo", "bar", 0)
if err != nil || result.Node.Key != "/foo" || result.Node.Value != "bar" {
t.Fatal("Failed to set value in etcd cluster")
}
}