Use raft.Server interface.

This commit is contained in:
Ben Johnson
2013-10-14 13:05:55 -06:00
parent 4f5ec77f87
commit e7598075ac
33 changed files with 384 additions and 328 deletions

View File

@@ -35,7 +35,7 @@ func (c *JoinCommand) CommandName() string {
}
// Join a server to the cluster
func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
func (c *JoinCommand) Apply(server raft.Server) (interface{}, error) {
ps, _ := server.Context().(*PeerServer)
b := make([]byte, 8)
@@ -62,7 +62,7 @@ func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
err := server.AddPeer(c.Name, "")
// Add peer stats
if c.Name != ps.Name() {
if c.Name != ps.RaftServer().Name() {
ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
}

View File

@@ -19,7 +19,7 @@ import (
)
type PeerServer struct {
*raft.Server
raftServer raft.Server
server *Server
joinIndex uint64
name string
@@ -78,12 +78,12 @@ func NewPeerServer(name string, path string, url string, listenHost string, tlsC
raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s)
// Create raft server
server, err := raft.NewServer(name, path, raftTransporter, s.store, s, "")
raftServer, err := raft.NewServer(name, path, raftTransporter, s.store, s, "")
if err != nil {
log.Fatal(err)
}
s.Server = server
s.raftServer = raftServer
return s
}
@@ -92,7 +92,7 @@ func NewPeerServer(name string, path string, url string, listenHost string, tlsC
func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
// LoadSnapshot
if snapshot {
err := s.LoadSnapshot()
err := s.raftServer.LoadSnapshot()
if err == nil {
log.Debugf("%s finished load snapshot", s.name)
@@ -101,12 +101,12 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
}
}
s.SetElectionTimeout(ElectionTimeout)
s.SetHeartbeatTimeout(HeartbeatTimeout)
s.raftServer.SetElectionTimeout(ElectionTimeout)
s.raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
s.Start()
s.raftServer.Start()
if s.IsLogEmpty() {
if s.raftServer.IsLogEmpty() {
// start as a leader in a new cluster
if len(cluster) == 0 {
s.startAsLeader()
@@ -116,7 +116,7 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
} else {
// Rejoin the previous cluster
cluster = s.registry.PeerURLs(s.Leader(), s.name)
cluster = s.registry.PeerURLs(s.raftServer.Leader(), s.name)
for i := 0; i < len(cluster); i++ {
u, err := url.Parse(cluster[i])
if err != nil {
@@ -143,8 +143,8 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
}
// Retrieves the underlying Raft server.
func (s *PeerServer) RaftServer() *raft.Server {
return s.Server
func (s *PeerServer) RaftServer() raft.Server {
return s.raftServer
}
// Associates the client server with the peer server.
@@ -155,7 +155,7 @@ func (s *PeerServer) SetServer(server *Server) {
func (s *PeerServer) startAsLeader() {
// leader need to join self as a peer
for {
_, err := s.Do(NewJoinCommand(PeerVersion, s.Name(), s.url, s.server.URL()))
_, err := s.raftServer.Do(NewJoinCommand(PeerVersion, s.raftServer.Name(), s.url, s.server.URL()))
if err == nil {
break
}
@@ -232,7 +232,7 @@ func (s *PeerServer) joinCluster(cluster []string) bool {
continue
}
err := s.joinByMachine(s.Server, machine, s.tlsConf.Scheme)
err := s.joinByMachine(s.raftServer, machine, s.tlsConf.Scheme)
if err == nil {
log.Debugf("%s success join to the cluster via machine %s", s.name, machine)
return true
@@ -249,7 +249,7 @@ func (s *PeerServer) joinCluster(cluster []string) bool {
}
// Send join requests to machine.
func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme string) error {
func (s *PeerServer) joinByMachine(server raft.Server, machine string, scheme string) error {
var b bytes.Buffer
// t must be ok
@@ -327,7 +327,7 @@ func (s *PeerServer) Stats() []byte {
}
func (s *PeerServer) PeerStats() []byte {
if s.State() == raft.Leader {
if s.raftServer.State() == raft.Leader {
b, _ := json.Marshal(s.followersStats)
return b
}
@@ -339,15 +339,15 @@ func (s *PeerServer) monitorSnapshot() {
time.Sleep(s.snapConf.checkingInterval)
currentWrites := 0
if uint64(currentWrites) > s.snapConf.writesThr {
s.TakeSnapshot()
s.raftServer.TakeSnapshot()
s.snapConf.lastWrites = 0
}
}
}
func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
if s.State() == raft.Leader {
result, err := s.Do(c)
if s.raftServer.State() == raft.Leader {
result, err := s.raftServer.Do(c)
if err != nil {
return err
}
@@ -375,7 +375,7 @@ func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.R
return nil
} else {
leader := s.Leader()
leader := s.raftServer.Leader()
// No leader available.
if leader == "" {

View File

@@ -14,7 +14,7 @@ func (s *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request)
log.Debugf("[recv] GET %s/log", s.url)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(s.LogEntries())
json.NewEncoder(w).Encode(s.raftServer.LogEntries())
}
// Response to vote request
@@ -23,7 +23,7 @@ func (s *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
err := decodeJsonRequest(req, rvreq)
if err == nil {
log.Debugf("[recv] POST %s/vote [%s]", s.url, rvreq.CandidateName)
if resp := s.RequestVote(rvreq); resp != nil {
if resp := s.raftServer.RequestVote(rvreq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
@@ -43,7 +43,7 @@ func (s *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.R
s.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
if resp := s.AppendEntries(aereq); resp != nil {
if resp := s.raftServer.AppendEntries(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
if !resp.Success {
@@ -62,7 +62,7 @@ func (s *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Reques
err := decodeJsonRequest(req, aereq)
if err == nil {
log.Debugf("[recv] POST %s/snapshot/ ", s.url)
if resp := s.RequestSnapshot(aereq); resp != nil {
if resp := s.raftServer.RequestSnapshot(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
@@ -78,7 +78,7 @@ func (s *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *htt
err := decodeJsonRequest(req, aereq)
if err == nil {
log.Debugf("[recv] POST %s/snapshotRecovery/ ", s.url)
if resp := s.SnapshotRecoveryRequest(aereq); resp != nil {
if resp := s.raftServer.SnapshotRecoveryRequest(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return

View File

@@ -23,7 +23,7 @@ func (c *RemoveCommand) CommandName() string {
}
// Remove a server from the cluster
func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) {
func (c *RemoveCommand) Apply(server raft.Server) (interface{}, error) {
ps, _ := server.Context().(*PeerServer)
// Remove node from the shared registry.

View File

@@ -56,22 +56,22 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI
// The current state of the server in the cluster.
func (s *Server) State() string {
return s.peerServer.State()
return s.peerServer.RaftServer().State()
}
// The node name of the leader in the cluster.
func (s *Server) Leader() string {
return s.peerServer.Leader()
return s.peerServer.RaftServer().Leader()
}
// The current Raft committed index.
func (s *Server) CommitIndex() uint64 {
return s.peerServer.CommitIndex()
return s.peerServer.RaftServer().CommitIndex()
}
// The current Raft term.
func (s *Server) Term() uint64 {
return s.peerServer.Term()
return s.peerServer.RaftServer().Term()
}
// The server URL.
@@ -201,7 +201,7 @@ func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) err
// Handler to return the current leader's raft address
func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error {
leader := s.peerServer.Leader()
leader := s.peerServer.RaftServer().Leader()
if leader == "" {
return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm)
}
@@ -213,7 +213,7 @@ func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) erro
// Handler to return all the known machines in the current cluster.
func (s *Server) GetMachinesHandler(w http.ResponseWriter, req *http.Request) error {
machines := s.registry.ClientURLs(s.peerServer.Leader(), s.name)
machines := s.registry.ClientURLs(s.peerServer.RaftServer().Leader(), s.name)
w.WriteHeader(http.StatusOK)
w.Write([]byte(strings.Join(machines, ", ")))
return nil
@@ -227,12 +227,12 @@ func (s *Server) GetStatsHandler(w http.ResponseWriter, req *http.Request) error
// Retrieves stats on the leader.
func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) error {
if s.peerServer.State() == raft.Leader {
if s.peerServer.RaftServer().State() == raft.Leader {
w.Write(s.peerServer.PeerStats())
return nil
}
leader := s.peerServer.Leader()
leader := s.peerServer.RaftServer().Leader()
if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
}
@@ -259,7 +259,7 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro
Value: "bar",
ExpireTime: time.Unix(0, 0),
}
s.peerServer.Do(c)
s.peerServer.RaftServer().Do(c)
}
c <- true
}()

View File

@@ -62,7 +62,7 @@ func dialWithTimeout(network, addr string) (net.Conn, error) {
}
// Sends AppendEntries RPCs to a peer when the server is the leader.
func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
var aersp *raft.AppendEntriesResponse
var b bytes.Buffer
@@ -117,7 +117,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
}
// Sends RequestVote RPCs to a peer when the server is the candidate.
func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
var rvrsp *raft.RequestVoteResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
@@ -146,7 +146,7 @@ func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req
}
// Sends SnapshotRequest RPCs to a peer when the server is the candidate.
func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
var aersp *raft.SnapshotResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
@@ -177,7 +177,7 @@ func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer,
}
// Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
var aersp *raft.SnapshotRecoveryResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)