Machine join/remove v2 API.

This commit is contained in:
Ben Johnson
2014-03-18 11:46:48 -07:00
parent e9a1ac15d9
commit 7d4fda550d
13 changed files with 335 additions and 100 deletions

View File

@@ -34,16 +34,31 @@ func (c *DemoteCommand) Apply(context raft.Context) (interface{}, error) {
clientURL, _ := ps.registry.ClientURL(c.Name)
peerURL, _ := ps.registry.PeerURL(c.Name)
// Perform a removal.
(&RemoveCommand{Name: c.Name}).Apply(context)
// Remove node from the shared registry.
err := ps.registry.UnregisterPeer(c.Name)
if err != nil {
log.Debugf("Demote peer %s: Error while unregistering (%v)", c.Name, err)
return nil, err
}
// Delete from stats
delete(ps.followersStats.Followers, c.Name)
// Remove peer in raft
err = context.Server().RemovePeer(c.Name)
if err != nil {
log.Debugf("Demote peer %s: (%v)", c.Name, err)
return nil, err
}
// Register node as a proxy.
ps.registry.RegisterProxy(c.Name, peerURL, clientURL)
// Update mode if this change applies to this server.
if c.Name == ps.Config.Name {
log.Infof("Set mode after demotion: %s", c.Name)
ps.setMode(ProxyMode)
log.Infof("Demote peer %s: Set mode to proxy with %s", c.Name, ps.server.Leader())
ps.proxyPeerURL, _ = ps.registry.PeerURL(ps.server.Leader())
go ps.setMode(ProxyMode)
}
return nil, nil

View File

@@ -1,27 +1,22 @@
package server
import (
"bytes"
"encoding/binary"
"encoding/json"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
)
func init() {
raft.RegisterCommand(&JoinCommand{})
raft.RegisterCommand(&JoinCommandV1{})
raft.RegisterCommand(&JoinCommandV2{})
}
// The JoinCommand adds a node to the cluster.
//
// The command returns the join_index (Uvarint) and peer flag (peer=0, proxy=1)
// in following binary format:
//
// 8 bytes | 1 byte
// join_index | join_mode
//
// This binary protocol is for backward compatibility.
type JoinCommand struct {
// JoinCommandV1 represents a request to join the cluster.
// The command returns the join_index (Uvarint).
type JoinCommandV1 struct {
MinVersion int `json:"minVersion"`
MaxVersion int `json:"maxVersion"`
Name string `json:"name"`
@@ -29,50 +24,30 @@ type JoinCommand struct {
EtcdURL string `json:"etcdURL"`
}
func NewJoinCommand(minVersion int, maxVersion int, name, raftUrl, etcdUrl string) *JoinCommand {
return &JoinCommand{
MinVersion: minVersion,
MaxVersion: maxVersion,
Name: name,
RaftURL: raftUrl,
EtcdURL: etcdUrl,
}
}
// The name of the join command in the log
func (c *JoinCommand) CommandName() string {
func (c *JoinCommandV1) CommandName() string {
return "etcd:join"
}
// Join a server to the cluster
func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) {
func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) {
ps, _ := context.Server().Context().(*PeerServer)
var buf bytes.Buffer
b := make([]byte, 8)
n := binary.PutUvarint(b, context.CommitIndex())
buf.Write(b[:n])
binary.PutUvarint(b, context.CommitIndex())
// Make sure we're not getting a cached value from the registry.
ps.registry.Invalidate(c.Name)
// Check if the join command is from a previous peer, who lost all its previous log.
if _, ok := ps.registry.ClientURL(c.Name); ok {
binary.Write(&buf, binary.BigEndian, uint8(peerModeFlag)) // Mark as peer.
return buf.Bytes(), nil
return b, nil
}
// Check peer number in the cluster
if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
log.Debug("Join as proxy ", c.Name)
ps.registry.RegisterProxy(c.Name, c.RaftURL, c.EtcdURL)
binary.Write(&buf, binary.BigEndian, uint8(proxyModeFlag)) // Mark as proxy.
return buf.Bytes(), nil
}
// Remove it as a proxy if it is one.
if ps.registry.ProxyExists(c.Name) {
ps.registry.UnregisterProxy(c.Name)
log.Debug("Reject join request from ", c.Name)
return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex())
}
// Add to shared peer registry.
@@ -87,10 +62,79 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) {
ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
}
binary.Write(&buf, binary.BigEndian, uint8(peerModeFlag)) // Mark as peer.
return buf.Bytes(), err
return b, err
}
func (c *JoinCommand) NodeName() string {
func (c *JoinCommandV1) NodeName() string {
return c.Name
}
// JoinCommandV2 represents a request to join the cluster.
type JoinCommandV2 struct {
MinVersion int `json:"minVersion"`
MaxVersion int `json:"maxVersion"`
Name string `json:"name"`
PeerURL string `json:"peerURL"`
ClientURL string `json:"clientURL"`
}
// CommandName returns the name of the command in the Raft log.
func (c *JoinCommandV2) CommandName() string {
return "etcd:v2:join"
}
// Apply attempts to join a machine to the cluster.
func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) {
ps, _ := context.Server().Context().(*PeerServer)
var msg = joinMessageV2{
Mode: PeerMode,
CommitIndex: context.CommitIndex(),
}
// Make sure we're not getting a cached value from the registry.
ps.registry.Invalidate(c.Name)
// Check if the join command is from a previous peer, who lost all its previous log.
if _, ok := ps.registry.ClientURL(c.Name); ok {
return json.Marshal(msg)
}
// Check peer number in the cluster.
if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
log.Debug("Join as proxy ", c.Name)
ps.registry.RegisterProxy(c.Name, c.PeerURL, c.ClientURL)
msg.Mode = ProxyMode
return json.Marshal(msg)
}
// Remove it as a proxy if it is one.
if ps.registry.ProxyExists(c.Name) {
ps.registry.UnregisterProxy(c.Name)
}
// Add to shared peer registry.
ps.registry.RegisterPeer(c.Name, c.PeerURL, c.ClientURL)
// Add peer in raft
if err := context.Server().AddPeer(c.Name, ""); err != nil {
b, _ := json.Marshal(msg)
return b, err
}
// Add peer stats
if c.Name != ps.RaftServer().Name() {
ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
}
return json.Marshal(msg)
}
func (c *JoinCommandV2) NodeName() string {
return c.Name
}
type joinMessageV2 struct {
CommitIndex uint64 `json:"commitIndex"`
Mode Mode `json:"mode"`
}

View File

@@ -1,12 +1,9 @@
package server
import (
"bufio"
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
@@ -313,10 +310,6 @@ func (s *PeerServer) HTTPHandler() http.Handler {
router.HandleFunc("/join", s.JoinHttpHandler)
router.HandleFunc("/promote", s.PromoteHttpHandler).Methods("POST")
router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler)
router.HandleFunc("/config", s.getClusterConfigHttpHandler).Methods("GET")
router.HandleFunc("/config", s.setClusterConfigHttpHandler).Methods("PUT")
router.HandleFunc("/machines", s.getMachinesHttpHandler).Methods("GET")
router.HandleFunc("/machines/{name}", s.getMachineHttpHandler).Methods("GET")
router.HandleFunc("/vote", s.VoteHttpHandler)
router.HandleFunc("/log", s.GetLogHttpHandler)
router.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
@@ -324,6 +317,13 @@ func (s *PeerServer) HTTPHandler() http.Handler {
router.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler)
router.HandleFunc("/etcdURL", s.EtcdURLHttpHandler)
router.HandleFunc("/v2/admin/config", s.getClusterConfigHttpHandler).Methods("GET")
router.HandleFunc("/v2/admin/config", s.setClusterConfigHttpHandler).Methods("PUT")
router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET")
router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET")
router.HandleFunc("/v2/admin/machines/{name}", s.addMachineHttpHandler).Methods("PUT")
router.HandleFunc("/v2/admin/machines/{name}", s.removeMachineHttpHandler).Methods("DELETE")
return router
}
@@ -340,7 +340,14 @@ func (s *PeerServer) SetServer(server *Server) {
func (s *PeerServer) startAsLeader() {
// leader need to join self as a peer
for {
_, err := s.raftServer.Do(NewJoinCommand(store.MinVersion(), store.MaxVersion(), s.raftServer.Name(), s.Config.URL, s.server.URL()))
c := &JoinCommandV1{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: s.raftServer.Name(),
RaftURL: s.Config.URL,
EtcdURL: s.server.URL(),
}
_, err := s.raftServer.Do(c)
if err == nil {
break
}
@@ -429,8 +436,6 @@ func (s *PeerServer) joinCluster(cluster []string) bool {
// Send join requests to peer.
func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) error {
var b bytes.Buffer
// t must be ok
t, _ := server.Transporter().(*transporter)
@@ -444,14 +449,21 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
return fmt.Errorf("Unable to join: cluster version is %d; version compatibility is %d - %d", version, store.MinVersion(), store.MaxVersion())
}
json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.Config.URL, s.server.URL()))
joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/join"}
var b bytes.Buffer
c := &JoinCommandV2{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: server.Name(),
PeerURL: s.Config.URL,
ClientURL: s.server.URL(),
}
json.NewEncoder(&b).Encode(c)
joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/v2/admin/machines/" + server.Name()}
log.Debugf("Send Join Request to %s", joinURL.String())
resp, req, err := t.Post(joinURL.String(), &b)
req, _ := http.NewRequest("PUT", joinURL.String(), &b)
resp, err := t.client.Do(req)
for {
if err != nil {
return fmt.Errorf("Unable to join: %v", err)
@@ -462,28 +474,17 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
t.CancelWhenTimeout(req)
if resp.StatusCode == http.StatusOK {
r := bufio.NewReader(resp.Body)
s.joinIndex, _ = binary.ReadUvarint(r)
// Determine whether the server joined as a proxy or peer.
var mode uint64
if mode, err = binary.ReadUvarint(r); err == io.EOF {
mode = peerModeFlag
} else if err != nil {
log.Debugf("Error reading join mode: %v", err)
var msg joinMessageV2
if err := json.NewDecoder(resp.Body).Decode(&msg); err != nil {
log.Debugf("Error reading join response: %v", err)
return err
}
s.joinIndex = msg.CommitIndex
s.setMode(msg.Mode)
switch mode {
case peerModeFlag:
s.setMode(PeerMode)
case proxyModeFlag:
s.setMode(ProxyMode)
if msg.Mode == ProxyMode {
s.proxyClientURL = resp.Header.Get("X-Leader-Client-URL")
s.proxyPeerURL = resp.Header.Get("X-Leader-Peer-URL")
default:
log.Debugf("Invalid join mode: %v", err)
return fmt.Errorf("Invalid join mode (%d): %v", mode, err)
}
return nil
@@ -491,7 +492,14 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
if resp.StatusCode == http.StatusTemporaryRedirect {
address := resp.Header.Get("Location")
log.Debugf("Send Join Request to %s", address)
json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.Config.URL, s.server.URL()))
c := &JoinCommandV1{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: server.Name(),
RaftURL: s.Config.URL,
EtcdURL: s.server.URL(),
}
json.NewEncoder(&b).Encode(c)
resp, req, err = t.Post(address, &b)
} else if resp.StatusCode == http.StatusBadRequest {

View File

@@ -150,16 +150,14 @@ func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Reques
// Response to the join request
func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
command := &JoinCommand{}
err := uhttp.DecodeJsonRequest(req, command)
if err != nil {
command := &JoinCommandV1{}
if err := uhttp.DecodeJsonRequest(req, command); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
log.Debugf("Receive Join Request from %s", command.Name)
err = ps.server.Dispatch(command, w, req)
err := ps.server.Dispatch(command, w, req)
// Return status.
if err != nil {
@@ -199,7 +197,7 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request
}
vars := mux.Vars(req)
command := &RemoveCommand{
command := &RemoveCommandV1{
Name: vars["name"],
}
@@ -286,6 +284,33 @@ func (ps *PeerServer) getMachineMessage(name string) *machineMessage {
return nil
}
// Adds a machine to the cluster.
func (ps *PeerServer) addMachineHttpHandler(w http.ResponseWriter, req *http.Request) {
c := &JoinCommandV2{}
if err := uhttp.DecodeJsonRequest(req, c); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
log.Debugf("Receive Join Request (v2) from %s", c.Name)
if err := ps.server.Dispatch(c, w, req); err != nil {
if etcdErr, ok := err.(*etcdErr.Error); ok {
log.Debug("Return error: ", (*etcdErr).Error())
etcdErr.Write(w)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}
// Removes a machine from the cluster.
func (ps *PeerServer) removeMachineHttpHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
c := &RemoveCommandV2{Name: vars["name"]}
log.Debugf("[recv] Remove Request [%s]", c.Name)
ps.server.Dispatch(c, w, req)
}
// Response to the name request
func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/name/ ", ps.Config.URL)

View File

@@ -65,13 +65,15 @@ func (r *Registry) Proxies() []string {
// RegisterPeer adds a peer to the registry.
func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) error {
// TODO(benbjohnson): Disallow peers that are already proxies.
return r.register(RegistryPeerKey, name, peerURL, machURL)
if err := r.register(RegistryPeerKey, name, peerURL, machURL); err != nil {
return err
}
r.peers[name] = r.load(RegistryPeerKey, name)
return nil
}
// RegisterProxy adds a proxy to the registry.
func (r *Registry) RegisterProxy(name string, peerURL string, machURL string) error {
// TODO(benbjohnson): Disallow proxies that are already peers.
if err := r.register(RegistryProxyKey, name, peerURL, machURL); err != nil {
return err
}

View File

@@ -2,6 +2,7 @@ package server
import (
"encoding/binary"
"encoding/json"
"os"
"github.com/coreos/etcd/log"
@@ -9,21 +10,22 @@ import (
)
func init() {
raft.RegisterCommand(&RemoveCommand{})
raft.RegisterCommand(&RemoveCommandV1{})
raft.RegisterCommand(&RemoveCommandV2{})
}
// The RemoveCommand removes a server from the cluster.
type RemoveCommand struct {
// The RemoveCommandV1 removes a server from the cluster.
type RemoveCommandV1 struct {
Name string `json:"name"`
}
// The name of the remove command in the log
func (c *RemoveCommand) CommandName() string {
func (c *RemoveCommandV1) CommandName() string {
return "etcd:remove"
}
// Remove a server from the cluster
func (c *RemoveCommand) Apply(context raft.Context) (interface{}, error) {
func (c *RemoveCommandV1) Apply(context raft.Context) (interface{}, error) {
ps, _ := context.Server().Context().(*PeerServer)
// If this is a proxy then remove it and exit.
@@ -70,3 +72,65 @@ func (c *RemoveCommand) Apply(context raft.Context) (interface{}, error) {
return b, err
}
// RemoveCommandV2 represents a command to remove a machine from the server.
type RemoveCommandV2 struct {
Name string `json:"name"`
}
// CommandName returns the name of the command.
func (c *RemoveCommandV2) CommandName() string {
return "etcd:v2:remove"
}
// Apply removes the given machine from the cluster.
func (c *RemoveCommandV2) Apply(context raft.Context) (interface{}, error) {
ps, _ := context.Server().Context().(*PeerServer)
ret, _ := json.Marshal(removeMessageV2{CommitIndex: context.CommitIndex()})
// If this is a proxy then remove it and exit.
if ps.registry.ProxyExists(c.Name) {
if err := ps.registry.UnregisterProxy(c.Name); err != nil {
return nil, err
}
return ret, nil
}
// Remove node from the shared registry.
err := ps.registry.UnregisterPeer(c.Name)
// Delete from stats
delete(ps.followersStats.Followers, c.Name)
if err != nil {
log.Debugf("Error while unregistering: %s (%v)", c.Name, err)
return nil, err
}
// Remove peer in raft
if err := context.Server().RemovePeer(c.Name); err != nil {
log.Debugf("Unable to remove peer: %s (%v)", c.Name, err)
return nil, err
}
if c.Name == context.Server().Name() {
// the removed node is this node
// if the node is not replaying the previous logs
// and the node has sent out a join request in this
// start. It is sure that this node received a new remove
// command and need to be removed
if context.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 {
log.Debugf("server [%s] is removed", context.Server().Name())
os.Exit(0)
} else {
// else ignore remove
log.Debugf("ignore previous remove command.")
}
}
return ret, nil
}
type removeMessageV2 struct {
CommitIndex uint64 `json:"commitIndex"`
}

View File

@@ -262,7 +262,9 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
var url string
switch c.(type) {
case *JoinCommand, *RemoveCommand:
case *JoinCommandV1, *RemoveCommandV1:
url, _ = ps.registry.PeerURL(leader)
case *JoinCommandV2, *RemoveCommandV2:
url, _ = ps.registry.PeerURL(leader)
default:
url, _ = ps.registry.ClientURL(leader)