Intermediate commit.

This commit is contained in:
Ben Johnson
2013-10-12 00:28:46 -06:00
parent 89334df5ae
commit eb78d96a20
27 changed files with 953 additions and 923 deletions

View File

@@ -2,8 +2,6 @@ package server
import (
"encoding/binary"
"fmt"
"path"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
@@ -42,38 +40,32 @@ func (c *JoinCommand) CommandName() string {
// Join a server to the cluster
func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(*store.Store)
r, _ := server.Context().(*RaftServer)
// check if the join command is from a previous machine, who lost all its previous log.
e, _ := s.Get(path.Join("/_etcd/machines", c.Name), false, false, server.CommitIndex(), server.Term())
ps, _ := server.Context().(*PeerServer)
b := make([]byte, 8)
binary.PutUvarint(b, server.CommitIndex())
if e != nil {
// Check if the join command is from a previous machine, who lost all its previous log.
if _, ok := ps.registry.URL(c.Name); ok {
return b, nil
}
// check machine number in the cluster
if s.MachineCount() == c.MaxClusterSize {
// Check machine number in the cluster
if ps.registry.Count() == c.MaxClusterSize {
log.Debug("Reject join request from ", c.Name)
return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex(), server.Term())
}
addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL)
// Add to shared machine registry.
ps.registry.Register(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL, server.CommitIndex(), server.Term())
// add peer in raft
// Add peer in raft
err := server.AddPeer(c.Name, "")
// add machine in etcd storage
key := path.Join("_etcd/machines", c.Name)
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
s.Create(key, value, false, false, store.Permanent, server.CommitIndex(), server.Term())
// add peer stats
if c.Name != r.Name() {
r.followersStats.Followers[c.Name] = &raftFollowerStats{}
r.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
// Add peer stats
if c.Name != ps.Name() {
ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
}
return b, err

474
server/peer_server.go Normal file
View File

@@ -0,0 +1,474 @@
package server
import (
"bytes"
"crypto/tls"
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
)
type PeerServer struct {
*raft.Server
joinIndex uint64
name string
url string
listenHost string
tlsConf *TLSConfig
tlsInfo *TLSInfo
followersStats *raftFollowersStats
serverStats *raftServerStats
registry *Registry
store *store.Store
snapConf *snapshotConf
MaxClusterSize int
}
// TODO: find a good policy to do snapshot
type snapshotConf struct {
// Etcd will check if snapshot is need every checkingInterval
checkingInterval time.Duration
// The number of writes when the last snapshot happened
lastWrites uint64
// If the incremental number of writes since the last snapshot
// exceeds the write Threshold, etcd will do a snapshot
writesThr uint64
}
func NewPeerServer(name string, path string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store *store.Store) *PeerServer {
s := &PeerServer{
name: name,
url: url,
listenHost: listenHost,
tlsConf: tlsConf,
tlsInfo: tlsInfo,
registry: registry,
store: store,
snapConf: &snapshotConf{time.Second * 3, 0, 20 * 1000},
followersStats: &raftFollowersStats{
Leader: name,
Followers: make(map[string]*raftFollowerStats),
},
serverStats: &raftServerStats{
StartTime: time.Now(),
sendRateQueue: &statsQueue{
back: -1,
},
recvRateQueue: &statsQueue{
back: -1,
},
},
}
// Create transporter for raft
raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s)
// Create raft server
server, err := raft.NewServer(name, path, raftTransporter, s.store, s, "")
if err != nil {
log.Fatal(err)
}
s.Server = server
return s
}
// Start the raft server
func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
// LoadSnapshot
if snapshot {
err := s.LoadSnapshot()
if err == nil {
log.Debugf("%s finished load snapshot", s.name)
} else {
log.Debug(err)
}
}
s.SetElectionTimeout(ElectionTimeout)
s.SetHeartbeatTimeout(HeartbeatTimeout)
s.Start()
if s.IsLogEmpty() {
// start as a leader in a new cluster
if len(cluster) == 0 {
s.startAsLeader()
} else {
s.startAsFollower(cluster)
}
} else {
// Rejoin the previous cluster
cluster = s.registry.PeerURLs()
for i := 0; i < len(cluster); i++ {
u, err := url.Parse(cluster[i])
if err != nil {
log.Debug("rejoin cannot parse url: ", err)
}
cluster[i] = u.Host
}
ok := s.joinCluster(cluster)
if !ok {
log.Warn("the entire cluster is down! this machine will restart the cluster.")
}
log.Debugf("%s restart as a follower", s.name)
}
// open the snapshot
if snapshot {
go s.monitorSnapshot()
}
// start to response to raft requests
go s.startTransport(s.tlsConf.Scheme, s.tlsConf.Server)
}
// Get all the current logs
func (s *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] GET %s/log", s.url)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(s.LogEntries())
}
// Response to vote request
func (s *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
rvreq := &raft.RequestVoteRequest{}
err := decodeJsonRequest(req, rvreq)
if err == nil {
log.Debugf("[recv] POST %s/vote [%s]", s.url, rvreq.CandidateName)
if resp := s.RequestVote(rvreq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
}
}
log.Warnf("[vote] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
// Response to append entries request
func (s *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.AppendEntriesRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
log.Debugf("[recv] POST %s/log/append [%d]", s.url, len(aereq.Entries))
s.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
if resp := s.AppendEntries(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
if !resp.Success {
log.Debugf("[Append Entry] Step back")
}
return
}
}
log.Warnf("[Append Entry] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
// Response to recover from snapshot request
func (s *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.SnapshotRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
log.Debugf("[recv] POST %s/snapshot/ ", s.url)
if resp := s.RequestSnapshot(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
}
}
log.Warnf("[Snapshot] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
// Response to recover from snapshot request
func (s *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.SnapshotRecoveryRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
log.Debugf("[recv] POST %s/snapshotRecovery/ ", s.url)
if resp := s.SnapshotRecoveryRequest(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
}
}
log.Warnf("[Snapshot] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
// Get the port that listening for etcd connecting of the server
func (s *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/etcdURL/ ", s.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(argInfo.EtcdURL))
}
// Response to the join request
func (s *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) error {
command := &JoinCommand{}
if err := decodeJsonRequest(req, command); err == nil {
log.Debugf("Receive Join Request from %s", command.Name)
return s.dispatchRaftCommand(command, w, req)
} else {
w.WriteHeader(http.StatusInternalServerError)
return nil
}
}
// Response to remove request
func (s *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
if req.Method != "DELETE" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
nodeName := req.URL.Path[len("/remove/"):]
command := &RemoveCommand{
Name: nodeName,
}
log.Debugf("[recv] Remove Request [%s]", command.Name)
s.dispatchRaftCommand(command, w, req)
}
// Response to the name request
func (s *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/name/ ", s.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(s.name))
}
// Response to the name request
func (s *PeerServer) RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/version/ ", s.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(PeerVersion))
}
func (s *PeerServer) dispatchRaftCommand(c raft.Command, w http.ResponseWriter, req *http.Request) error {
return s.dispatch(c, w, req, nameToRaftURL)
}
func (s *PeerServer) startAsLeader() {
// leader need to join self as a peer
for {
_, err := s.Do(newJoinCommand(PeerVersion, s.Name(), s.url, e.url))
if err == nil {
break
}
}
log.Debugf("%s start as a leader", s.name)
}
func (s *PeerServer) startAsFollower(cluster []string) {
// start as a follower in a existing cluster
for i := 0; i < retryTimes; i++ {
ok := s.joinCluster(cluster)
if ok {
return
}
log.Warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
time.Sleep(time.Second * RetryInterval)
}
fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
}
// Start to listen and response raft command
func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) {
infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url)
raftMux := http.NewServeMux()
server := &http.Server{
Handler: raftMux,
TLSConfig: &tlsConf,
Addr: s.listenHost,
}
// internal commands
raftMux.HandleFunc("/name", s.NameHttpHandler)
raftMux.HandleFunc("/version", s.RaftVersionHttpHandler)
raftMux.Handle("/join", errorHandler(s.JoinHttpHandler))
raftMux.HandleFunc("/remove/", s.RemoveHttpHandler)
raftMux.HandleFunc("/vote", s.VoteHttpHandler)
raftMux.HandleFunc("/log", s.GetLogHttpHandler)
raftMux.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
raftMux.HandleFunc("/snapshot", s.SnapshotHttpHandler)
raftMux.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler)
raftMux.HandleFunc("/etcdURL", s.EtcdURLHttpHandler)
if scheme == "http" {
fatal(server.ListenAndServe())
} else {
fatal(server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile))
}
}
// getVersion fetches the raft version of a peer. This works for now but we
// will need to do something more sophisticated later when we allow mixed
// version clusters.
func getVersion(t *transporter, versionURL url.URL) (string, error) {
resp, req, err := t.Get(versionURL.String())
if err != nil {
return "", err
}
defer resp.Body.Close()
t.CancelWhenTimeout(req)
body, err := ioutil.ReadAll(resp.Body)
return string(body), nil
}
func (s *PeerServer) joinCluster(cluster []string) bool {
for _, machine := range cluster {
if len(machine) == 0 {
continue
}
err := s.joinByMachine(s.Server, machine, s.tlsConf.Scheme)
if err == nil {
log.Debugf("%s success join to the cluster via machine %s", s.name, machine)
return true
} else {
if _, ok := err.(etcdErr.Error); ok {
fatal(err)
}
log.Debugf("cannot join to cluster via machine %s %s", machine, err)
}
}
return false
}
// Send join requests to machine.
func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme string) error {
var b bytes.Buffer
// t must be ok
t, _ := server.Transporter().(*transporter)
// Our version must match the leaders version
versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"}
version, err := getVersion(t, versionURL)
if err != nil {
return fmt.Errorf("Unable to join: %v", err)
}
// TODO: versioning of the internal protocol. See:
// Documentation/internatl-protocol-versioning.md
if version != PeerVersion {
return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd")
}
json.NewEncoder(&b).Encode(newJoinCommand(PeerVersion, server.Name(), server.url, e.url))
joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"}
log.Debugf("Send Join Request to %s", joinURL.String())
resp, req, err := t.Post(joinURL.String(), &b)
for {
if err != nil {
return fmt.Errorf("Unable to join: %v", err)
}
if resp != nil {
defer resp.Body.Close()
t.CancelWhenTimeout(req)
if resp.StatusCode == http.StatusOK {
b, _ := ioutil.ReadAll(resp.Body)
server.joinIndex, _ = binary.Uvarint(b)
return nil
}
if resp.StatusCode == http.StatusTemporaryRedirect {
address := resp.Header.Get("Location")
log.Debugf("Send Join Request to %s", address)
json.NewEncoder(&b).Encode(newJoinCommand(PeerVersion, server.Name(), server.url, e.url))
resp, req, err = t.Post(address, &b)
} else if resp.StatusCode == http.StatusBadRequest {
debug("Reach max number machines in the cluster")
decoder := json.NewDecoder(resp.Body)
err := &etcdErr.Error{}
decoder.Decode(err)
return *err
} else {
return fmt.Errorf("Unable to join")
}
}
}
return fmt.Errorf("Unable to join: %v", err)
}
func (s *PeerServer) Stats() []byte {
s.serverStats.LeaderInfo.Uptime = time.Now().Sub(s.serverStats.LeaderInfo.startTime).String()
queue := s.serverStats.sendRateQueue
s.serverStats.SendingPkgRate, s.serverStats.SendingBandwidthRate = queue.Rate()
queue = s.serverStats.recvRateQueue
s.serverStats.RecvingPkgRate, s.serverStats.RecvingBandwidthRate = queue.Rate()
b, _ := json.Marshal(s.serverStats)
return b
}
func (s *PeerServer) PeerStats() []byte {
if s.State() == raft.Leader {
b, _ := json.Marshal(s.followersStats)
return b
}
return nil
}
func (s *PeerServer) monitorSnapshot() {
for {
time.Sleep(s.snapConf.checkingInterval)
currentWrites := 0
if uint64(currentWrites) > s.snapConf.writesThr {
r.TakeSnapshot()
s.snapConf.lastWrites = 0
}
}
}

167
server/registry.go Normal file
View File

@@ -0,0 +1,167 @@
package server
import (
"sync"
"github.com/coreos/etcd/store"
)
// The location of the machine URL data.
const RegistryKey = "/_etcd/machines"
// The Registry stores URL information for nodes.
type Registry struct {
sync.Mutex
store *store.Store
nodes map[string]*node
}
// The internal storage format of the registry.
type node struct {
peerVersion string
peerURL string
url string
}
// Creates a new Registry.
func NewRegistry(s *store.Store) *Registry {
return &Registry{
store: s,
nodes: make(map[string]*node),
}
}
// Adds a node to the registry.
func (r *Registry) Register(name string, peerVersion string, peerURL string, url string, commitIndex uint64, term uint64) {
r.Lock()
defer r.Unlock()
// Write data to store.
key := path.Join(RegistryKey, name)
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", peerURL, url, peerVersion)
r.store.Create(key, value, false, false, store.Permanent, commitIndex, term)
}
// Removes a node from the registry.
func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) error {
r.Lock()
defer r.Unlock()
// Remove the key from the store.
_, err := s.Delete(path.Join(RegistryKey, name), false, commitIndex, term)
return err
}
// Returns the number of nodes in the cluster.
func (r *Registry) Count() int {
e, err := s.Get(RegistryKey, false, false, 0, 0)
if err != nil {
return 0
}
return len(e.KVPairs)
}
// Retrieves the URL for a given node by name.
func (r *Registry) URL(name string) (string, bool) {
r.Lock()
defer r.Unlock()
return r.url(name)
}
func (r *Registry) url(name string) (string, bool) {
if r.nodes[name] == nil {
r.load(name)
}
if node := r.nodes[name]; node != nil {
return node.url, true
}
return "", false
}
// Retrieves the URLs for all nodes.
func (r *Registry) URLs() []string {
r.Lock()
defer r.Unlock()
// Retrieve a list of all nodes.
e, err := s.Get(RegistryKey, false, false, 0, 0)
if err != nil {
return make([]string, 0)
}
// Lookup the URL for each one.
urls := make([]string, 0)
for _, pair := range e.KVPairs {
urls = append(urls, r.url(pair.Key))
}
return urls
}
// Retrieves the peer URL for a given node by name.
func (r *Registry) PeerURL(name string) (string, bool) {
r.Lock()
defer r.Unlock()
return r.peerURL(name)
}
func (r *Registry) peerURL(name string) (string, bool) {
if r.nodes[name] == nil {
r.load(name)
}
if node := r.nodes[name]; node != nil {
return node.peerURL, true
}
return "", false
}
// Retrieves the peer URLs for all nodes.
func (r *Registry) PeerURLs() []string {
r.Lock()
defer r.Unlock()
// Retrieve a list of all nodes.
e, err := s.Get(RegistryKey, false, false, 0, 0)
if err != nil {
return make([]string, 0)
}
// Lookup the URL for each one.
urls := make([]string, 0)
for _, pair := range e.KVPairs {
urls = append(urls, r.peerURL(pair.Key))
}
return urls
}
// Loads the given node by name from the store into the cache.
func (r *Registry) load(name string) {
if name == "" {
return
}
// Retrieve from store.
e, err := etcdStore.Get(path.Join(RegistryKey, name), false, false, 0, 0)
if err != nil {
return
}
// Parse as a query string.
m, err := url.ParseQuery(e.Value)
if err != nil {
panic(fmt.Sprintf("Failed to parse machines entry: %s", name))
}
// Create node.
r.nodes[name] := &node{
url: m["etcd"][0],
peerURL: m["raft"][0],
peerVersion: m["raftVersion"][0],
}
}

View File

@@ -25,22 +25,20 @@ func (c *RemoveCommand) CommandName() string {
// Remove a server from the cluster
func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(*store.Store)
r, _ := server.Context().(*RaftServer)
ps, _ := server.Context().(*PeerServer)
// remove machine in etcd storage
key := path.Join("_etcd/machines", c.Name)
// Remove node from the shared registry.
err := ps.registry.Unregister(c.Name, server.CommitIndex(), server.Term())
_, err := s.Delete(key, false, server.CommitIndex(), server.Term())
// delete from stats
delete(r.followersStats.Followers, c.Name)
// Delete from stats
delete(ps.followersStats.Followers, c.Name)
if err != nil {
return []byte{0}, err
}
// remove peer in raft
// Remove peer in raft
err = server.RemovePeer(c.Name)
if err != nil {
return []byte{0}, err
}
@@ -52,7 +50,7 @@ func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) {
// and the node has sent out a join request in this
// start. It is sure that this node received a new remove
// command and need to be removed
if server.CommitIndex() > r.joinIndex && r.joinIndex != 0 {
if server.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 {
debugf("server [%s] is removed", server.Name())
os.Exit(0)
} else {

View File

@@ -4,7 +4,6 @@ import (
"net/http"
"net/url"
"github.com/coreos/etcd/command"
"github.com/coreos/go-raft"
"github.com/gorilla/mux"
)
@@ -13,7 +12,7 @@ import (
type Server interface {
CommitIndex() uint64
Term() uint64
Dispatch(command.Command, http.ResponseWriter, *http.Request)
Dispatch(raft.Command, http.ResponseWriter, *http.Request)
}
// This is the default implementation of the Server interface.

11
server/timeout.go Normal file
View File

@@ -0,0 +1,11 @@
package server
const (
// The amount of time to elapse without a heartbeat before becoming a candidate.
ElectionTimeout = 200 * time.Millisecond
// The frequency by which heartbeats are sent to followers.
HeartbeatTimeout = 50 * time.Millisecond
RetryInterval = 10
)

226
server/transporter.go Normal file
View File

@@ -0,0 +1,226 @@
package server
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"time"
"github.com/coreos/go-raft"
)
// Timeout for setup internal raft http connection
// This should not exceed 3 * RTT
var dailTimeout = 3 * HeartbeatTimeout
// Timeout for setup internal raft http connection + receive response header
// This should not exceed 3 * RTT + RTT
var responseHeaderTimeout = 4 * HeartbeatTimeout
// Timeout for receiving the response body from the server
// This should not exceed election timeout
var tranTimeout = ElectionTimeout
// Transporter layer for communication between raft nodes
type transporter struct {
client *http.Client
transport *http.Transport
raftServer *raftServer
}
// Create transporter using by raft server
// Create http or https transporter based on
// whether the user give the server cert and key
func newTransporter(scheme string, tlsConf tls.Config, raftServer *raftServer) *transporter {
t := transporter{}
tr := &http.Transport{
Dial: dialWithTimeout,
ResponseHeaderTimeout: responseHeaderTimeout,
}
if scheme == "https" {
tr.TLSClientConfig = &tlsConf
tr.DisableCompression = true
}
t.client = &http.Client{Transport: tr}
t.transport = tr
t.raftServer = raftServer
return &t
}
// Dial with timeout
func dialWithTimeout(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, dailTimeout)
}
// Sends AppendEntries RPCs to a peer when the server is the leader.
func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
var aersp *raft.AppendEntriesResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
size := b.Len()
t.raftServer.serverStats.SendAppendReq(size)
u, _ := nameToRaftURL(peer.Name)
debugf("Send LogEntries to %s ", u)
thisFollowerStats, ok := t.raftServer.followersStats.Followers[peer.Name]
if !ok { //this is the first time this follower has been seen
thisFollowerStats = &raftFollowerStats{}
thisFollowerStats.Latency.Minimum = 1 << 63
t.raftServer.followersStats.Followers[peer.Name] = thisFollowerStats
}
start := time.Now()
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
end := time.Now()
if err != nil {
debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
if ok {
thisFollowerStats.Fail()
}
} else {
if ok {
thisFollowerStats.Succ(end.Sub(start))
}
}
if resp != nil {
defer resp.Body.Close()
t.CancelWhenTimeout(httpRequest)
aersp = &raft.AppendEntriesResponse{}
if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp
}
}
return aersp
}
// Sends RequestVote RPCs to a peer when the server is the candidate.
func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
var rvrsp *raft.RequestVoteResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := nameToRaftURL(peer.Name)
debugf("Send Vote to %s", u)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
if err != nil {
debugf("Cannot send VoteRequest to %s : %s", u, err)
}
if resp != nil {
defer resp.Body.Close()
t.CancelWhenTimeout(httpRequest)
rvrsp := &raft.RequestVoteResponse{}
if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
return rvrsp
}
}
return rvrsp
}
// Sends SnapshotRequest RPCs to a peer when the server is the candidate.
func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
var aersp *raft.SnapshotResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := nameToRaftURL(peer.Name)
debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
if err != nil {
debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
}
if resp != nil {
defer resp.Body.Close()
t.CancelWhenTimeout(httpRequest)
aersp = &raft.SnapshotResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp
}
}
return aersp
}
// Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
var aersp *raft.SnapshotRecoveryResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := nameToRaftURL(peer.Name)
debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex)
resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
if err != nil {
debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
}
if resp != nil {
defer resp.Body.Close()
aersp = &raft.SnapshotRecoveryResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp
}
}
return aersp
}
// Send server side POST request
func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
req, _ := http.NewRequest("POST", urlStr, body)
resp, err := t.client.Do(req)
return resp, req, err
}
// Send server side GET request
func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
req, _ := http.NewRequest("GET", urlStr, nil)
resp, err := t.client.Do(req)
return resp, req, err
}
// Cancel the on fly HTTP transaction when timeout happens.
func (t *transporter) CancelWhenTimeout(req *http.Request) {
go func() {
time.Sleep(ElectionTimeout)
t.transport.CancelRequest(req)
}()
}

View File

@@ -0,0 +1,61 @@
package server
import (
"crypto/tls"
"fmt"
"io/ioutil"
"net/http"
"testing"
"time"
)
func TestTransporterTimeout(t *testing.T) {
http.HandleFunc("/timeout", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "timeout")
w.(http.Flusher).Flush() // send headers and some body
time.Sleep(time.Second * 100)
})
go http.ListenAndServe(":8080", nil)
conf := tls.Config{}
ts := newTransporter("http", conf, nil)
ts.Get("http://google.com")
_, _, err := ts.Get("http://google.com:9999")
if err == nil {
t.Fatal("timeout error")
}
res, req, err := ts.Get("http://localhost:8080/timeout")
if err != nil {
t.Fatal("should not timeout")
}
ts.CancelWhenTimeout(req)
body, err := ioutil.ReadAll(res.Body)
if err == nil {
fmt.Println(string(body))
t.Fatal("expected an error reading the body")
}
_, _, err = ts.Post("http://google.com:9999", nil)
if err == nil {
t.Fatal("timeout error")
}
_, _, err = ts.Get("http://www.google.com")
if err != nil {
t.Fatal("get error: ", err.Error())
}
_, _, err = ts.Post("http://www.google.com", nil)
if err != nil {
t.Fatal("post error")
}
}

17
server/util.go Normal file
View File

@@ -0,0 +1,17 @@
package server
import (
"fmt"
"net/http"
"github.com/coreos/etcd/log"
)
func decodeJsonRequest(req *http.Request, data interface{}) error {
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&data); err != nil && err != io.EOF {
log.Warnf("Malformed json request: %v", err)
return fmt.Errorf("Malformed json request: %v", err)
}
return nil
}

View File

@@ -12,10 +12,6 @@ import (
"github.com/coreos/go-raft"
)
//-------------------------------------------------------------------
// Handlers to handle etcd-store related request via etcd url
//-------------------------------------------------------------------
func NewEtcdMuxer() *http.ServeMux {
// external commands
router := mux.NewRouter()

8
server/version.go Normal file
View File

@@ -0,0 +1,8 @@
package server
const Version = "v2"
// TODO: The release version (generated from the git tag) will be the raft
// protocol version for now. When things settle down we will fix it like the
// client API above.
const PeerVersion = releaseVersion