add remove peer

This commit is contained in:
Xiang Li 2013-08-19 12:10:11 -07:00
parent 57ef6e9f5a
commit 64e6d54758
5 changed files with 161 additions and 36 deletions

View File

@ -1,11 +1,13 @@
package main
import (
"encoding/binary"
"encoding/json"
"fmt"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"os"
"path"
"time"
)
@ -140,18 +142,25 @@ func (c *JoinCommand) CommandName() string {
// Join a server to the cluster
func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
if c.Name == r.Name() {
r.pendingJoin = false
}
// check if the join command is from a previous machine, who lost all its previous log.
response, _ := etcdStore.RawGet(path.Join("_etcd/machines", c.Name))
b := make([]byte, 8)
binary.PutUvarint(b, raftServer.CommitIndex())
if response != nil {
return []byte("join success"), nil
return b, nil
}
// check machine number in the cluster
num := machineNum()
if num == maxClusterSize {
debug("Reject join request from ", c.Name)
return []byte("join fail"), etcdErr.NewError(103, "")
return []byte{0}, etcdErr.NewError(103, "")
}
addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL)
@ -164,9 +173,70 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
return []byte("join success"), err
return b, err
}
func (c *JoinCommand) NodeName() string {
return c.Name
}
// RemoveCommand
type RemoveCommand struct {
Name string `json:"name"`
}
// The name of the remove command in the log
func (c *RemoveCommand) CommandName() string {
return "etcd:remove"
}
// Remove a server from the cluster
func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) {
// remove machine in etcd storage
key := path.Join("_etcd/machines", c.Name)
_, err := etcdStore.Delete(key, raftServer.CommitIndex())
if err != nil {
return []byte{0}, err
}
// remove peer in raft
err = raftServer.RemovePeer(c.Name)
if err != nil {
return []byte{0}, err
}
if c.Name == raftServer.Name() {
// the removed node is this node
// if the node is not replying the previous logs
// and the node has sent out a join request in this
// start. It is sure that this node received a new remove
// command and need to be removed
if raftServer.CommitIndex() > r.joinIndex && r.joinIndex != 0 {
debugf("server [%s] is removed", raftServer.Name())
os.Exit(0)
} else {
// the node is replying previous logs and there is a join command
// afterwards, we should not exit
if r.joinIndex == 0 {
// if the node has not sent a join command in this start
// it will need to send a join command after reply the logs
r.pendingJoin = true
} else {
// else ignore remove
debugf("ignore previous remove command.")
}
}
}
b := make([]byte, 8)
binary.PutUvarint(b, raftServer.CommitIndex())
return b, err
}

View File

@ -189,7 +189,7 @@ func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
// Handler to return all the known machines in the current cluster
func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
machines := getMachines()
machines := getMachines(true)
w.WriteHeader(http.StatusOK)
w.Write([]byte(strings.Join(machines, ", ")))

View File

@ -8,13 +8,21 @@ func machineNum() int {
}
// getMachines gets the current machines in the cluster
func getMachines() []string {
func getMachines(etcd bool) []string {
peers := r.Peers()
machines := make([]string, len(peers)+1)
leader, ok := nameToEtcdURL(r.Leader())
var toURL func(string) (string, bool)
if etcd {
toURL = nameToEtcdURL
} else {
toURL = nameToRaftURL
}
leader, ok := toURL(r.Leader())
self := e.url
i := 1
@ -30,7 +38,7 @@ func getMachines() []string {
// Add all peers to the slice
for peerName, _ := range peers {
if machine, ok := nameToEtcdURL(peerName); ok {
if machine, ok := toURL(peerName); ok {
// do not add leader twice
if machine != leader {
machines[i] = machine

View File

@ -107,6 +107,24 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) error {
}
}
// Response to remove request
func RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
if req.Method != "DELETE" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
nodeName := req.URL.Path[len("/remove/"):]
command := &RemoveCommand{
Name: nodeName,
}
debugf("[recv] Remove Request [%s]", command.Name)
dispatch(command, w, req, false)
}
// Response to the name request
func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
debugf("[recv] Get %s/name/ ", r.url)

View File

@ -3,6 +3,7 @@ package main
import (
"bytes"
"crypto/tls"
"encoding/binary"
"encoding/json"
"fmt"
etcdErr "github.com/coreos/etcd/error"
@ -15,11 +16,13 @@ import (
type raftServer struct {
*raft.Server
version string
name string
url string
tlsConf *TLSConfig
tlsInfo *TLSInfo
version string
joinIndex uint64
pendingJoin bool
name string
url string
tlsConf *TLSConfig
tlsInfo *TLSInfo
}
var r *raftServer
@ -77,6 +80,22 @@ func (r *raftServer) ListenAndServe() {
}
} else {
if r.pendingJoin {
cluster = getMachines(false)
for i := 0; i < len(cluster); i++ {
u, err := url.Parse(cluster[i])
if err != nil {
debug("rejoin cannot parse url: ", err)
}
cluster[i] = u.Host
}
ok := joinCluster(cluster)
if !ok {
fatal("cannot rejoin to the cluster")
}
}
// rejoin the previous cluster
debugf("%s restart as a follower", r.name)
}
@ -105,26 +124,10 @@ func startAsLeader() {
func startAsFollower() {
// start as a follower in a existing cluster
for i := 0; i < retryTimes; i++ {
for _, machine := range cluster {
if len(machine) == 0 {
continue
}
err := joinCluster(r.Server, machine, r.tlsConf.Scheme)
if err == nil {
debugf("%s success join to the cluster via machine %s", r.name, machine)
return
} else {
if _, ok := err.(etcdErr.Error); ok {
fatal(err)
}
debugf("cannot join to cluster via machine %s %s", machine, err)
}
ok := joinCluster(cluster)
if ok {
return
}
warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
time.Sleep(time.Second * RetryInterval)
}
@ -149,6 +152,7 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
raftMux.HandleFunc("/name", NameHttpHandler)
raftMux.HandleFunc("/version", RaftVersionHttpHandler)
raftMux.Handle("/join", errorHandler(JoinHttpHandler))
raftMux.HandleFunc("/remove/", RemoveHttpHandler)
raftMux.HandleFunc("/vote", VoteHttpHandler)
raftMux.HandleFunc("/log", GetLogHttpHandler)
raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
@ -180,15 +184,37 @@ func getVersion(t transporter, versionURL url.URL) (string, error) {
return string(body), nil
}
// Send join requests to the leader.
func joinCluster(s *raft.Server, raftURL string, scheme string) error {
func joinCluster(cluster []string) bool {
for _, machine := range cluster {
if len(machine) == 0 {
continue
}
err := joinByMachine(r.Server, machine, r.tlsConf.Scheme)
if err == nil {
debugf("%s success join to the cluster via machine %s", r.name, machine)
return true
} else {
if _, ok := err.(etcdErr.Error); ok {
fatal(err)
}
debugf("cannot join to cluster via machine %s %s", machine, err)
}
}
return false
}
// Send join requests to machine.
func joinByMachine(s *raft.Server, machine string, scheme string) error {
var b bytes.Buffer
// t must be ok
t, _ := r.Transporter().(transporter)
// Our version must match the leaders version
versionURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/version"}
versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"}
version, err := getVersion(t, versionURL)
if err != nil {
return fmt.Errorf("Unable to join: %v", err)
@ -202,9 +228,9 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error {
json.NewEncoder(&b).Encode(newJoinCommand())
joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"}
joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"}
debugf("Send Join Request to %s", raftURL)
debugf("Send Join Request to %s", joinURL.String())
resp, err := t.Post(joinURL.String(), &b)
@ -215,6 +241,8 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error {
if resp != nil {
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
b, _ := ioutil.ReadAll(resp.Body)
r.joinIndex, _ = binary.Uvarint(b)
return nil
}
if resp.StatusCode == http.StatusTemporaryRedirect {
@ -244,6 +272,7 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error {
// Register commands to raft server
func registerCommands() {
raft.RegisterCommand(&JoinCommand{})
raft.RegisterCommand(&RemoveCommand{})
raft.RegisterCommand(&SetCommand{})
raft.RegisterCommand(&GetCommand{})
raft.RegisterCommand(&DeleteCommand{})