From e7598075ac85512c419bb0c27e055799f6bbb2e9 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 14 Oct 2013 13:05:55 -0600 Subject: [PATCH] Use raft.Server interface. --- server/join_command.go | 4 +- server/peer_server.go | 38 +-- server/peer_server_handlers.go | 10 +- server/remove_command.go | 2 +- server/server.go | 18 +- server/transporter.go | 8 +- store/create_command.go | 2 +- store/delete_command.go | 2 +- store/test_and_set_command.go | 2 +- store/update_command.go | 2 +- .../github.com/coreos/go-etcd/README.md | 6 +- .../github.com/coreos/go-etcd/etcd/delete.go | 5 +- .../github.com/coreos/go-etcd/etcd/get.go | 13 +- .../coreos/go-etcd/etcd/response.go | 26 ++ .../github.com/coreos/go-etcd/etcd/set.go | 9 +- .../coreos/go-etcd/etcd/testAndSet.go | 5 +- .../github.com/coreos/go-etcd/etcd/watch.go | 7 +- .../coreos/go-etcd/etcd/watch_test.go | 5 +- .../github.com/coreos/go-raft/README.md | 2 +- .../github.com/coreos/go-raft/command.go | 2 +- .../coreos/go-raft/http_transporter.go | 18 +- .../coreos/go-raft/http_transporter_test.go | 18 +- .../github.com/coreos/go-raft/join_command.go | 4 +- .../coreos/go-raft/leave_command.go | 4 +- .../github.com/coreos/go-raft/nop_command.go | 2 +- third_party/github.com/coreos/go-raft/peer.go | 4 +- .../github.com/coreos/go-raft/server.go | 159 ++++++---- .../github.com/coreos/go-raft/server_test.go | 289 +++++++++--------- .../github.com/coreos/go-raft/snapshot.go | 6 +- .../go-raft/snapshot_recovery_request.go | 2 +- third_party/github.com/coreos/go-raft/test.go | 28 +- .../github.com/coreos/go-raft/transporter.go | 8 +- web/web.go | 2 +- 33 files changed, 384 insertions(+), 328 deletions(-) create mode 100644 third_party/github.com/coreos/go-etcd/etcd/response.go diff --git a/server/join_command.go b/server/join_command.go index 83d2efb73..7bebbe704 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -35,7 +35,7 @@ func (c *JoinCommand) CommandName() string { } // Join a server to the cluster -func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) { +func (c *JoinCommand) Apply(server raft.Server) (interface{}, error) { ps, _ := server.Context().(*PeerServer) b := make([]byte, 8) @@ -62,7 +62,7 @@ func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) { err := server.AddPeer(c.Name, "") // Add peer stats - if c.Name != ps.Name() { + if c.Name != ps.RaftServer().Name() { ps.followersStats.Followers[c.Name] = &raftFollowerStats{} ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 } diff --git a/server/peer_server.go b/server/peer_server.go index 97a7757a3..d85ed9a21 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -19,7 +19,7 @@ import ( ) type PeerServer struct { - *raft.Server + raftServer raft.Server server *Server joinIndex uint64 name string @@ -78,12 +78,12 @@ func NewPeerServer(name string, path string, url string, listenHost string, tlsC raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s) // Create raft server - server, err := raft.NewServer(name, path, raftTransporter, s.store, s, "") + raftServer, err := raft.NewServer(name, path, raftTransporter, s.store, s, "") if err != nil { log.Fatal(err) } - s.Server = server + s.raftServer = raftServer return s } @@ -92,7 +92,7 @@ func NewPeerServer(name string, path string, url string, listenHost string, tlsC func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) { // LoadSnapshot if snapshot { - err := s.LoadSnapshot() + err := s.raftServer.LoadSnapshot() if err == nil { log.Debugf("%s finished load snapshot", s.name) @@ -101,12 +101,12 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) { } } - s.SetElectionTimeout(ElectionTimeout) - s.SetHeartbeatTimeout(HeartbeatTimeout) + s.raftServer.SetElectionTimeout(ElectionTimeout) + s.raftServer.SetHeartbeatTimeout(HeartbeatTimeout) - s.Start() + s.raftServer.Start() - if s.IsLogEmpty() { + if s.raftServer.IsLogEmpty() { // start as a leader in a new cluster if len(cluster) == 0 { s.startAsLeader() @@ -116,7 +116,7 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) { } else { // Rejoin the previous cluster - cluster = s.registry.PeerURLs(s.Leader(), s.name) + cluster = s.registry.PeerURLs(s.raftServer.Leader(), s.name) for i := 0; i < len(cluster); i++ { u, err := url.Parse(cluster[i]) if err != nil { @@ -143,8 +143,8 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) { } // Retrieves the underlying Raft server. -func (s *PeerServer) RaftServer() *raft.Server { - return s.Server +func (s *PeerServer) RaftServer() raft.Server { + return s.raftServer } // Associates the client server with the peer server. @@ -155,7 +155,7 @@ func (s *PeerServer) SetServer(server *Server) { func (s *PeerServer) startAsLeader() { // leader need to join self as a peer for { - _, err := s.Do(NewJoinCommand(PeerVersion, s.Name(), s.url, s.server.URL())) + _, err := s.raftServer.Do(NewJoinCommand(PeerVersion, s.raftServer.Name(), s.url, s.server.URL())) if err == nil { break } @@ -232,7 +232,7 @@ func (s *PeerServer) joinCluster(cluster []string) bool { continue } - err := s.joinByMachine(s.Server, machine, s.tlsConf.Scheme) + err := s.joinByMachine(s.raftServer, machine, s.tlsConf.Scheme) if err == nil { log.Debugf("%s success join to the cluster via machine %s", s.name, machine) return true @@ -249,7 +249,7 @@ func (s *PeerServer) joinCluster(cluster []string) bool { } // Send join requests to machine. -func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme string) error { +func (s *PeerServer) joinByMachine(server raft.Server, machine string, scheme string) error { var b bytes.Buffer // t must be ok @@ -327,7 +327,7 @@ func (s *PeerServer) Stats() []byte { } func (s *PeerServer) PeerStats() []byte { - if s.State() == raft.Leader { + if s.raftServer.State() == raft.Leader { b, _ := json.Marshal(s.followersStats) return b } @@ -339,15 +339,15 @@ func (s *PeerServer) monitorSnapshot() { time.Sleep(s.snapConf.checkingInterval) currentWrites := 0 if uint64(currentWrites) > s.snapConf.writesThr { - s.TakeSnapshot() + s.raftServer.TakeSnapshot() s.snapConf.lastWrites = 0 } } } func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error { - if s.State() == raft.Leader { - result, err := s.Do(c) + if s.raftServer.State() == raft.Leader { + result, err := s.raftServer.Do(c) if err != nil { return err } @@ -375,7 +375,7 @@ func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.R return nil } else { - leader := s.Leader() + leader := s.raftServer.Leader() // No leader available. if leader == "" { diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index ad055623e..adb192e60 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -14,7 +14,7 @@ func (s *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) log.Debugf("[recv] GET %s/log", s.url) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(s.LogEntries()) + json.NewEncoder(w).Encode(s.raftServer.LogEntries()) } // Response to vote request @@ -23,7 +23,7 @@ func (s *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) { err := decodeJsonRequest(req, rvreq) if err == nil { log.Debugf("[recv] POST %s/vote [%s]", s.url, rvreq.CandidateName) - if resp := s.RequestVote(rvreq); resp != nil { + if resp := s.raftServer.RequestVote(rvreq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) return @@ -43,7 +43,7 @@ func (s *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.R s.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) - if resp := s.AppendEntries(aereq); resp != nil { + if resp := s.raftServer.AppendEntries(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) if !resp.Success { @@ -62,7 +62,7 @@ func (s *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Reques err := decodeJsonRequest(req, aereq) if err == nil { log.Debugf("[recv] POST %s/snapshot/ ", s.url) - if resp := s.RequestSnapshot(aereq); resp != nil { + if resp := s.raftServer.RequestSnapshot(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) return @@ -78,7 +78,7 @@ func (s *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *htt err := decodeJsonRequest(req, aereq) if err == nil { log.Debugf("[recv] POST %s/snapshotRecovery/ ", s.url) - if resp := s.SnapshotRecoveryRequest(aereq); resp != nil { + if resp := s.raftServer.SnapshotRecoveryRequest(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) return diff --git a/server/remove_command.go b/server/remove_command.go index 6a8f75171..42e2c5078 100644 --- a/server/remove_command.go +++ b/server/remove_command.go @@ -23,7 +23,7 @@ func (c *RemoveCommand) CommandName() string { } // Remove a server from the cluster -func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) { +func (c *RemoveCommand) Apply(server raft.Server) (interface{}, error) { ps, _ := server.Context().(*PeerServer) // Remove node from the shared registry. diff --git a/server/server.go b/server/server.go index 4e21aa179..d1b1abf0f 100644 --- a/server/server.go +++ b/server/server.go @@ -56,22 +56,22 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI // The current state of the server in the cluster. func (s *Server) State() string { - return s.peerServer.State() + return s.peerServer.RaftServer().State() } // The node name of the leader in the cluster. func (s *Server) Leader() string { - return s.peerServer.Leader() + return s.peerServer.RaftServer().Leader() } // The current Raft committed index. func (s *Server) CommitIndex() uint64 { - return s.peerServer.CommitIndex() + return s.peerServer.RaftServer().CommitIndex() } // The current Raft term. func (s *Server) Term() uint64 { - return s.peerServer.Term() + return s.peerServer.RaftServer().Term() } // The server URL. @@ -201,7 +201,7 @@ func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) err // Handler to return the current leader's raft address func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error { - leader := s.peerServer.Leader() + leader := s.peerServer.RaftServer().Leader() if leader == "" { return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm) } @@ -213,7 +213,7 @@ func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) erro // Handler to return all the known machines in the current cluster. func (s *Server) GetMachinesHandler(w http.ResponseWriter, req *http.Request) error { - machines := s.registry.ClientURLs(s.peerServer.Leader(), s.name) + machines := s.registry.ClientURLs(s.peerServer.RaftServer().Leader(), s.name) w.WriteHeader(http.StatusOK) w.Write([]byte(strings.Join(machines, ", "))) return nil @@ -227,12 +227,12 @@ func (s *Server) GetStatsHandler(w http.ResponseWriter, req *http.Request) error // Retrieves stats on the leader. func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) error { - if s.peerServer.State() == raft.Leader { + if s.peerServer.RaftServer().State() == raft.Leader { w.Write(s.peerServer.PeerStats()) return nil } - leader := s.peerServer.Leader() + leader := s.peerServer.RaftServer().Leader() if leader == "" { return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) } @@ -259,7 +259,7 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro Value: "bar", ExpireTime: time.Unix(0, 0), } - s.peerServer.Do(c) + s.peerServer.RaftServer().Do(c) } c <- true }() diff --git a/server/transporter.go b/server/transporter.go index 56d1b1edc..3c5002f1b 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -62,7 +62,7 @@ func dialWithTimeout(network, addr string) (net.Conn, error) { } // Sends AppendEntries RPCs to a peer when the server is the leader. -func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { +func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { var aersp *raft.AppendEntriesResponse var b bytes.Buffer @@ -117,7 +117,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P } // Sends RequestVote RPCs to a peer when the server is the candidate. -func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { +func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { var rvrsp *raft.RequestVoteResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) @@ -146,7 +146,7 @@ func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req } // Sends SnapshotRequest RPCs to a peer when the server is the candidate. -func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { +func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { var aersp *raft.SnapshotResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) @@ -177,7 +177,7 @@ func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, } // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate. -func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { +func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { var aersp *raft.SnapshotRecoveryResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) diff --git a/store/create_command.go b/store/create_command.go index f13b91790..43c09f998 100644 --- a/store/create_command.go +++ b/store/create_command.go @@ -25,7 +25,7 @@ func (c *CreateCommand) CommandName() string { } // Create node -func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) { +func (c *CreateCommand) Apply(server raft.Server) (interface{}, error) { s, _ := server.StateMachine().(Store) e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term()) diff --git a/store/delete_command.go b/store/delete_command.go index bc84dfc99..6ff3c5c6f 100644 --- a/store/delete_command.go +++ b/store/delete_command.go @@ -21,7 +21,7 @@ func (c *DeleteCommand) CommandName() string { } // Delete the key -func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) { +func (c *DeleteCommand) Apply(server raft.Server) (interface{}, error) { s, _ := server.StateMachine().(Store) e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term()) diff --git a/store/test_and_set_command.go b/store/test_and_set_command.go index 811f713e5..03cb1879a 100644 --- a/store/test_and_set_command.go +++ b/store/test_and_set_command.go @@ -26,7 +26,7 @@ func (c *TestAndSetCommand) CommandName() string { } // Set the key-value pair if the current value of the key equals to the given prevValue -func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) { +func (c *TestAndSetCommand) Apply(server raft.Server) (interface{}, error) { s, _ := server.StateMachine().(Store) e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex, diff --git a/store/update_command.go b/store/update_command.go index 582fb42fe..3152006bc 100644 --- a/store/update_command.go +++ b/store/update_command.go @@ -24,7 +24,7 @@ func (c *UpdateCommand) CommandName() string { } // Update node -func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) { +func (c *UpdateCommand) Apply(server raft.Server) (interface{}, error) { s, _ := server.StateMachine().(Store) e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) diff --git a/third_party/github.com/coreos/go-etcd/README.md b/third_party/github.com/coreos/go-etcd/README.md index 7a8f8b34a..9496dace4 100644 --- a/third_party/github.com/coreos/go-etcd/README.md +++ b/third_party/github.com/coreos/go-etcd/README.md @@ -31,19 +31,19 @@ func main() { c := etcd.NewClient() // default binds to http://0.0.0.0:4001 // SET the value "bar" to the key "foo" with zero TTL - // returns a: *store.Response + // returns a: *Response res, _ := c.Set("foo", "bar", 0) fmt.Printf("set response: %+v\n", res) // GET the value that is stored for the key "foo" - // return a slice: []*store.Response + // return a slice: []*Response values, _ := c.Get("foo") for i, res := range values { // .. and print them out fmt.Printf("[%d] get response: %+v\n", i, res) } // DELETE the key "foo" - // returns a: *store.Response + // returns a: *Response res, _ = c.Delete("foo") fmt.Printf("delete response: %+v\n", res) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete.go b/third_party/github.com/coreos/go-etcd/etcd/delete.go index fea169560..91f6df87a 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/delete.go +++ b/third_party/github.com/coreos/go-etcd/etcd/delete.go @@ -2,13 +2,12 @@ package etcd import ( "encoding/json" - "github.com/coreos/etcd/store" "io/ioutil" "net/http" "path" ) -func (c *Client) Delete(key string) (*store.Response, error) { +func (c *Client) Delete(key string) (*Response, error) { resp, err := c.sendRequest("DELETE", path.Join("keys", key), "") @@ -28,7 +27,7 @@ func (c *Client) Delete(key string) (*store.Response, error) { return nil, handleError(b) } - var result store.Response + var result Response err = json.Unmarshal(b, &result) diff --git a/third_party/github.com/coreos/go-etcd/etcd/get.go b/third_party/github.com/coreos/go-etcd/etcd/get.go index b0d16fe20..3288621e7 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/get.go +++ b/third_party/github.com/coreos/go-etcd/etcd/get.go @@ -2,13 +2,12 @@ package etcd import ( "encoding/json" - "github.com/coreos/etcd/store" "io/ioutil" "net/http" "path" ) -func (c *Client) Get(key string) ([]*store.Response, error) { +func (c *Client) Get(key string) ([]*Response, error) { logger.Debugf("get %s [%s]", key, c.cluster.Leader) resp, err := c.sendRequest("GET", path.Join("keys", key), "") @@ -36,7 +35,7 @@ func (c *Client) Get(key string) ([]*store.Response, error) { // GetTo gets the value of the key from a given machine address. // If the given machine is not available it returns an error. // Mainly use for testing purpose -func (c *Client) GetFrom(key string, addr string) ([]*store.Response, error) { +func (c *Client) GetFrom(key string, addr string) ([]*Response, error) { httpPath := c.createHttpPath(addr, path.Join(version, "keys", key)) resp, err := c.httpClient.Get(httpPath) @@ -61,10 +60,10 @@ func (c *Client) GetFrom(key string, addr string) ([]*store.Response, error) { } // Convert byte stream to response. -func convertGetResponse(b []byte) ([]*store.Response, error) { +func convertGetResponse(b []byte) ([]*Response, error) { - var results []*store.Response - var result *store.Response + var results []*Response + var result *Response err := json.Unmarshal(b, &result) @@ -76,7 +75,7 @@ func convertGetResponse(b []byte) ([]*store.Response, error) { } } else { - results = make([]*store.Response, 1) + results = make([]*Response, 1) results[0] = result } return results, nil diff --git a/third_party/github.com/coreos/go-etcd/etcd/response.go b/third_party/github.com/coreos/go-etcd/etcd/response.go new file mode 100644 index 000000000..d2311a675 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/response.go @@ -0,0 +1,26 @@ +package etcd + +import ( + "time" +) + +// The response object from the server. +type Response struct { + Action string `json:"action"` + Key string `json:"key"` + Dir bool `json:"dir,omitempty"` + PrevValue string `json:"prevValue,omitempty"` + Value string `json:"value,omitempty"` + + // If the key did not exist before the action, + // this field should be set to true + NewKey bool `json:"newKey,omitempty"` + + Expiration *time.Time `json:"expiration,omitempty"` + + // Time to live in second + TTL int64 `json:"ttl,omitempty"` + + // The command index of the raft machine when the command is executed + Index uint64 `json:"index"` +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set.go b/third_party/github.com/coreos/go-etcd/etcd/set.go index 78acb9081..17fc415f2 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/set.go +++ b/third_party/github.com/coreos/go-etcd/etcd/set.go @@ -3,14 +3,13 @@ package etcd import ( "encoding/json" "fmt" - "github.com/coreos/etcd/store" "io/ioutil" "net/http" "net/url" "path" ) -func (c *Client) Set(key string, value string, ttl uint64) (*store.Response, error) { +func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) { logger.Debugf("set %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) v := url.Values{} v.Set("value", value) @@ -45,7 +44,7 @@ func (c *Client) Set(key string, value string, ttl uint64) (*store.Response, err // SetTo sets the value of the key to a given machine address. // If the given machine is not available or is not leader it returns an error // Mainly use for testing purpose. -func (c *Client) SetTo(key string, value string, ttl uint64, addr string) (*store.Response, error) { +func (c *Client) SetTo(key string, value string, ttl uint64, addr string) (*Response, error) { v := url.Values{} v.Set("value", value) @@ -77,8 +76,8 @@ func (c *Client) SetTo(key string, value string, ttl uint64, addr string) (*stor } // Convert byte stream to response. -func convertSetResponse(b []byte) (*store.Response, error) { - var result store.Response +func convertSetResponse(b []byte) (*Response, error) { + var result Response err := json.Unmarshal(b, &result) diff --git a/third_party/github.com/coreos/go-etcd/etcd/testAndSet.go b/third_party/github.com/coreos/go-etcd/etcd/testAndSet.go index 0bd8672ec..bdd8ecb4f 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/testAndSet.go +++ b/third_party/github.com/coreos/go-etcd/etcd/testAndSet.go @@ -3,14 +3,13 @@ package etcd import ( "encoding/json" "fmt" - "github.com/coreos/etcd/store" "io/ioutil" "net/http" "net/url" "path" ) -func (c *Client) TestAndSet(key string, prevValue string, value string, ttl uint64) (*store.Response, bool, error) { +func (c *Client) TestAndSet(key string, prevValue string, value string, ttl uint64) (*Response, bool, error) { logger.Debugf("set %s, %s[%s], ttl: %d, [%s]", key, value, prevValue, ttl, c.cluster.Leader) v := url.Values{} v.Set("value", value) @@ -39,7 +38,7 @@ func (c *Client) TestAndSet(key string, prevValue string, value string, ttl uint return nil, false, handleError(b) } - var result store.Response + var result Response err = json.Unmarshal(b, &result) diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch.go b/third_party/github.com/coreos/go-etcd/etcd/watch.go index 7f59ed065..18fcfdc12 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/watch.go +++ b/third_party/github.com/coreos/go-etcd/etcd/watch.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/coreos/etcd/store" "io/ioutil" "net/http" "net/url" @@ -28,7 +27,7 @@ var ( // channel. And after someone receive the channel, it will go on to watch that prefix. // If a stop channel is given, client can close long-term watch using the stop channel -func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *store.Response, stop chan bool) (*store.Response, error) { +func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) { logger.Debugf("watch %s [%s]", prefix, c.cluster.Leader) if receiver == nil { return c.watchOnce(prefix, sinceIndex, stop) @@ -50,7 +49,7 @@ func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *store.Re // helper func // return when there is change under the given prefix -func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*store.Response, error) { +func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*Response, error) { var resp *http.Response var err error @@ -94,7 +93,7 @@ func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*stor return nil, handleError(b) } - var result store.Response + var result Response err = json.Unmarshal(b, &result) diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go index 0d9348518..a3d33a4f1 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go @@ -2,7 +2,6 @@ package etcd import ( "fmt" - "github.com/coreos/etcd/store" "testing" "time" ) @@ -30,7 +29,7 @@ func TestWatch(t *testing.T) { t.Fatalf("Watch with Index failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index) } - ch := make(chan *store.Response, 10) + ch := make(chan *Response, 10) stop := make(chan bool, 1) go setLoop("bar", c) @@ -57,7 +56,7 @@ func setLoop(value string, c *Client) { } } -func receiver(c chan *store.Response, stop chan bool) { +func receiver(c chan *Response, stop chan bool) { for i := 0; i < 10; i++ { <-c } diff --git a/third_party/github.com/coreos/go-raft/README.md b/third_party/github.com/coreos/go-raft/README.md index 6ecc74866..9d189ff6d 100644 --- a/third_party/github.com/coreos/go-raft/README.md +++ b/third_party/github.com/coreos/go-raft/README.md @@ -57,7 +57,7 @@ A distributed consensus protocol is used for maintaining a consistent state acro Many distributed systems are built upon the Paxos protocol but Paxos can be difficult to understand and there are many gaps between Paxos and real world implementation. An alternative is the [Raft distributed consensus protocol][raft-paper] by Diego Ongaro and John Ousterhout. -Raft is a protocol built with understandability as a primary tenant and it centers around two things: +Raft is a protocol built with understandability as a primary tenet and it centers around two things: 1. Leader Election 2. Replicated Log diff --git a/third_party/github.com/coreos/go-raft/command.go b/third_party/github.com/coreos/go-raft/command.go index 2c0495171..9188881b8 100644 --- a/third_party/github.com/coreos/go-raft/command.go +++ b/third_party/github.com/coreos/go-raft/command.go @@ -29,7 +29,7 @@ func init() { // A command represents an action to be taken on the replicated state machine. type Command interface { CommandName() string - Apply(server *Server) (interface{}, error) + Apply(server Server) (interface{}, error) } type CommandEncoder interface { diff --git a/third_party/github.com/coreos/go-raft/http_transporter.go b/third_party/github.com/coreos/go-raft/http_transporter.go index e41fd817f..80aa3e74e 100644 --- a/third_party/github.com/coreos/go-raft/http_transporter.go +++ b/third_party/github.com/coreos/go-raft/http_transporter.go @@ -77,7 +77,7 @@ func (t *HTTPTransporter) RequestVotePath() string { //-------------------------------------- // Applies Raft routes to an HTTP router for a given server. -func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) { +func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer) { mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server)) mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server)) } @@ -87,14 +87,14 @@ func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) { //-------------------------------------- // Sends an AppendEntries RPC to a peer. -func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { +func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { var b bytes.Buffer if _, err := req.Encode(&b); err != nil { traceln("transporter.ae.encoding.error:", err) return nil } - url := fmt.Sprintf("http://%s%s", peer.Name, t.AppendEntriesPath()) + url := fmt.Sprintf("%s%s", peer.ConnectionString, t.AppendEntriesPath()) traceln(server.Name(), "POST", url) client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}} @@ -115,14 +115,14 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r } // Sends a RequestVote RPC to a peer. -func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { +func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { var b bytes.Buffer if _, err := req.Encode(&b); err != nil { traceln("transporter.rv.encoding.error:", err) return nil } - url := fmt.Sprintf("http://%s%s", peer.Name, t.RequestVotePath()) + url := fmt.Sprintf("%s%s", peer.ConnectionString, t.RequestVotePath()) traceln(server.Name(), "POST", url) client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}} @@ -143,12 +143,12 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque } // Sends a SnapshotRequest RPC to a peer. -func (t *HTTPTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse { +func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse { return nil } // Sends a SnapshotRequest RPC to a peer. -func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse { +func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse { return nil } @@ -157,7 +157,7 @@ func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer //-------------------------------------- // Handles incoming AppendEntries requests. -func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc { +func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { traceln(server.Name(), "RECV /appendEntries") @@ -176,7 +176,7 @@ func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc } // Handles incoming RequestVote requests. -func (t *HTTPTransporter) requestVoteHandler(server *Server) http.HandlerFunc { +func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { traceln(server.Name(), "RECV /requestVote") diff --git a/third_party/github.com/coreos/go-raft/http_transporter_test.go b/third_party/github.com/coreos/go-raft/http_transporter_test.go index 3bd4a6d74..ab44a523a 100644 --- a/third_party/github.com/coreos/go-raft/http_transporter_test.go +++ b/third_party/github.com/coreos/go-raft/http_transporter_test.go @@ -14,8 +14,8 @@ func TestHTTPTransporter(t *testing.T) { transporter := NewHTTPTransporter("/raft") transporter.DisableKeepAlives = true - servers := []*Server{} - f0 := func(server *Server, httpServer *http.Server) { + servers := []Server{} + f0 := func(server Server, httpServer *http.Server) { // Stop the leader and wait for an election. server.Stop() time.Sleep(testElectionTimeout * 2) @@ -25,15 +25,15 @@ func TestHTTPTransporter(t *testing.T) { } server.Start() } - f1 := func(server *Server, httpServer *http.Server) { + f1 := func(server Server, httpServer *http.Server) { } - f2 := func(server *Server, httpServer *http.Server) { + f2 := func(server Server, httpServer *http.Server) { } runTestHttpServers(t, &servers, transporter, f0, f1, f2) } // Starts multiple independent Raft servers wrapped with HTTP servers. -func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTransporter, callbacks ...func(*Server, *http.Server)) { +func runTestHttpServers(t *testing.T, servers *[]Server, transporter *HTTPTransporter, callbacks ...func(Server, *http.Server)) { var wg sync.WaitGroup httpServers := []*http.Server{} listeners := []net.Listener{} @@ -68,7 +68,7 @@ func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTrans // Setup configuration. for _, server := range *servers { - if _, err := (*servers)[0].Do(&DefaultJoinCommand{Name: server.Name()}); err != nil { + if _, err := (*servers)[0].Do(&DefaultJoinCommand{Name: server.Name(), ConnectionString: fmt.Sprintf("http://%s", server.Name())}); err != nil { t.Fatalf("Server %s unable to join: %v", server.Name(), err) } } @@ -94,7 +94,7 @@ func BenchmarkSpeed(b *testing.B) { transporter := NewHTTPTransporter("/raft") transporter.DisableKeepAlives = true - servers := []*Server{} + servers := []Server{} for i := 0; i < 3; i++ { port := 9000 + i @@ -125,7 +125,7 @@ func BenchmarkSpeed(b *testing.B) { // Setup configuration. for _, server := range servers { - (servers)[0].Do(&DefaultJoinCommand{Name: server.Name()}) + (servers)[0].Do(&DefaultJoinCommand{Name: server.Name(), ConnectionString: fmt.Sprintf("http://%s", server.Name())}) } c := make(chan bool) @@ -145,7 +145,7 @@ func BenchmarkSpeed(b *testing.B) { } } -func send(c chan bool, s *Server) { +func send(c chan bool, s Server) { for i := 0; i < 20; i++ { s.Do(&NOPCommand{}) } diff --git a/third_party/github.com/coreos/go-raft/join_command.go b/third_party/github.com/coreos/go-raft/join_command.go index 949eaae76..1a43db2c3 100644 --- a/third_party/github.com/coreos/go-raft/join_command.go +++ b/third_party/github.com/coreos/go-raft/join_command.go @@ -3,7 +3,7 @@ package raft // Join command interface type JoinCommand interface { CommandName() string - Apply(server *Server) (interface{}, error) + Apply(server Server) (interface{}, error) NodeName() string } @@ -18,7 +18,7 @@ func (c *DefaultJoinCommand) CommandName() string { return "raft:join" } -func (c *DefaultJoinCommand) Apply(server *Server) (interface{}, error) { +func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error) { err := server.AddPeer(c.Name, c.ConnectionString) return []byte("join"), err diff --git a/third_party/github.com/coreos/go-raft/leave_command.go b/third_party/github.com/coreos/go-raft/leave_command.go index c2a4923a0..ca2f148cf 100644 --- a/third_party/github.com/coreos/go-raft/leave_command.go +++ b/third_party/github.com/coreos/go-raft/leave_command.go @@ -3,7 +3,7 @@ package raft // Leave command interface type LeaveCommand interface { CommandName() string - Apply(server *Server) (interface{}, error) + Apply(server Server) (interface{}, error) NodeName() string } @@ -17,7 +17,7 @@ func (c *DefaultLeaveCommand) CommandName() string { return "raft:leave" } -func (c *DefaultLeaveCommand) Apply(server *Server) (interface{}, error) { +func (c *DefaultLeaveCommand) Apply(server Server) (interface{}, error) { err := server.RemovePeer(c.Name) return []byte("leave"), err diff --git a/third_party/github.com/coreos/go-raft/nop_command.go b/third_party/github.com/coreos/go-raft/nop_command.go index e3183cdd8..1e5671428 100644 --- a/third_party/github.com/coreos/go-raft/nop_command.go +++ b/third_party/github.com/coreos/go-raft/nop_command.go @@ -13,7 +13,7 @@ func (c NOPCommand) CommandName() string { return "raft:nop" } -func (c NOPCommand) Apply(server *Server) (interface{}, error) { +func (c NOPCommand) Apply(server Server) (interface{}, error) { return nil, nil } diff --git a/third_party/github.com/coreos/go-raft/peer.go b/third_party/github.com/coreos/go-raft/peer.go index 7b116edbb..516b535e4 100644 --- a/third_party/github.com/coreos/go-raft/peer.go +++ b/third_party/github.com/coreos/go-raft/peer.go @@ -13,7 +13,7 @@ import ( // A peer is a reference to another server involved in the consensus protocol. type Peer struct { - server *Server + server *server Name string `json:"name"` ConnectionString string `json:"connectionString"` prevLogIndex uint64 @@ -29,7 +29,7 @@ type Peer struct { //------------------------------------------------------------------------------ // Creates a new peer. -func newPeer(server *Server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer { +func newPeer(server *server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer { return &Peer{ server: server, Name: name, diff --git a/third_party/github.com/coreos/go-raft/server.go b/third_party/github.com/coreos/go-raft/server.go index b9b00e697..d7dec5b23 100644 --- a/third_party/github.com/coreos/go-raft/server.go +++ b/third_party/github.com/coreos/go-raft/server.go @@ -57,7 +57,46 @@ var CommandTimeoutError = errors.New("raft: Command timeout") // A server is involved in the consensus protocol and can act as a follower, // candidate or a leader. -type Server struct { +type Server interface { + Name() string + Context() interface{} + StateMachine() StateMachine + Leader() string + State() string + Path() string + LogPath() string + SnapshotPath(lastIndex uint64, lastTerm uint64) string + Term() uint64 + CommitIndex() uint64 + VotedFor() string + MemberCount() int + QuorumSize() int + IsLogEmpty() bool + LogEntries() []*LogEntry + LastCommandName() string + GetState() string + ElectionTimeout() time.Duration + SetElectionTimeout(duration time.Duration) + HeartbeatTimeout() time.Duration + SetHeartbeatTimeout(duration time.Duration) + Transporter() Transporter + SetTransporter(t Transporter) + AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse + RequestVote(req *RequestVoteRequest) *RequestVoteResponse + RequestSnapshot(req *SnapshotRequest) *SnapshotResponse + SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse + AddPeer(name string, connectiongString string) error + RemovePeer(name string) error + Peers() map[string]*Peer + Start() error + Stop() + Running() bool + Do(command Command) (interface{}, error) + TakeSnapshot() error + LoadSnapshot() error +} + +type server struct { name string path string state string @@ -98,7 +137,7 @@ type event struct { //------------------------------------------------------------------------------ // Creates a new server with a log at the given path. -func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (*Server, error) { +func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (Server, error) { if name == "" { return nil, errors.New("raft.Server: Name cannot be blank") } @@ -106,7 +145,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S panic("raft: Transporter required") } - s := &Server{ + s := &server{ name: name, path: path, transporter: transporter, @@ -142,22 +181,22 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S //-------------------------------------- // Retrieves the name of the server. -func (s *Server) Name() string { +func (s *server) Name() string { return s.name } // Retrieves the storage path for the server. -func (s *Server) Path() string { +func (s *server) Path() string { return s.path } // The name of the current leader. -func (s *Server) Leader() string { +func (s *server) Leader() string { return s.leader } // Retrieves a copy of the peer data. -func (s *Server) Peers() map[string]*Peer { +func (s *server) Peers() map[string]*Peer { s.mutex.Lock() defer s.mutex.Unlock() @@ -169,42 +208,42 @@ func (s *Server) Peers() map[string]*Peer { } // Retrieves the object that transports requests. -func (s *Server) Transporter() Transporter { +func (s *server) Transporter() Transporter { s.mutex.RLock() defer s.mutex.RUnlock() return s.transporter } -func (s *Server) SetTransporter(t Transporter) { +func (s *server) SetTransporter(t Transporter) { s.mutex.Lock() defer s.mutex.Unlock() s.transporter = t } // Retrieves the context passed into the constructor. -func (s *Server) Context() interface{} { +func (s *server) Context() interface{} { return s.context } // Retrieves the state machine passed into the constructor. -func (s *Server) StateMachine() StateMachine { +func (s *server) StateMachine() StateMachine { return s.stateMachine } // Retrieves the log path for the server. -func (s *Server) LogPath() string { +func (s *server) LogPath() string { return path.Join(s.path, "log") } // Retrieves the current state of the server. -func (s *Server) State() string { +func (s *server) State() string { s.mutex.RLock() defer s.mutex.RUnlock() return s.state } // Sets the state of the server. -func (s *Server) setState(state string) { +func (s *server) setState(state string) { s.mutex.Lock() defer s.mutex.Unlock() s.state = state @@ -214,44 +253,44 @@ func (s *Server) setState(state string) { } // Retrieves the current term of the server. -func (s *Server) Term() uint64 { +func (s *server) Term() uint64 { return s.currentTerm } // Retrieves the current commit index of the server. -func (s *Server) CommitIndex() uint64 { +func (s *server) CommitIndex() uint64 { return s.log.commitIndex } // Retrieves the name of the candidate this server voted for in this term. -func (s *Server) VotedFor() string { +func (s *server) VotedFor() string { return s.votedFor } // Retrieves whether the server's log has no entries. -func (s *Server) IsLogEmpty() bool { +func (s *server) IsLogEmpty() bool { return s.log.isEmpty() } // A list of all the log entries. This should only be used for debugging purposes. -func (s *Server) LogEntries() []*LogEntry { +func (s *server) LogEntries() []*LogEntry { return s.log.entries } // A reference to the command name of the last entry. -func (s *Server) LastCommandName() string { +func (s *server) LastCommandName() string { return s.log.lastCommandName() } // Get the state of the server for debugging -func (s *Server) GetState() string { +func (s *server) GetState() string { s.mutex.RLock() defer s.mutex.RUnlock() return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex) } // Check if the server is promotable -func (s *Server) promotable() bool { +func (s *server) promotable() bool { return s.log.currentIndex() > 0 } @@ -260,14 +299,14 @@ func (s *Server) promotable() bool { //-------------------------------------- // Retrieves the number of member servers in the consensus. -func (s *Server) MemberCount() int { +func (s *server) MemberCount() int { s.mutex.Lock() defer s.mutex.Unlock() return len(s.peers) + 1 } // Retrieves the number of servers required to make a quorum. -func (s *Server) QuorumSize() int { +func (s *server) QuorumSize() int { return (s.MemberCount() / 2) + 1 } @@ -276,14 +315,14 @@ func (s *Server) QuorumSize() int { //-------------------------------------- // Retrieves the election timeout. -func (s *Server) ElectionTimeout() time.Duration { +func (s *server) ElectionTimeout() time.Duration { s.mutex.RLock() defer s.mutex.RUnlock() return s.electionTimeout } // Sets the election timeout. -func (s *Server) SetElectionTimeout(duration time.Duration) { +func (s *server) SetElectionTimeout(duration time.Duration) { s.mutex.Lock() defer s.mutex.Unlock() s.electionTimeout = duration @@ -294,14 +333,14 @@ func (s *Server) SetElectionTimeout(duration time.Duration) { //-------------------------------------- // Retrieves the heartbeat timeout. -func (s *Server) HeartbeatTimeout() time.Duration { +func (s *server) HeartbeatTimeout() time.Duration { s.mutex.RLock() defer s.mutex.RUnlock() return s.heartbeatTimeout } // Sets the heartbeat timeout. -func (s *Server) SetHeartbeatTimeout(duration time.Duration) { +func (s *server) SetHeartbeatTimeout(duration time.Duration) { s.mutex.Lock() defer s.mutex.Unlock() @@ -334,7 +373,7 @@ func init() { // If no log entries exist and a self-join command is issued then // immediately become leader and commit entry. -func (s *Server) Start() error { +func (s *server) Start() error { // Exit if the server is already running. if s.state != Stopped { return errors.New("raft.Server: Server already running") @@ -380,7 +419,7 @@ func (s *Server) Start() error { } // Shuts down the server. -func (s *Server) Stop() { +func (s *server) Stop() { s.send(&stopValue) s.mutex.Lock() defer s.mutex.Unlock() @@ -388,7 +427,7 @@ func (s *Server) Stop() { } // Checks if the server is currently running. -func (s *Server) Running() bool { +func (s *server) Running() bool { s.mutex.RLock() defer s.mutex.RUnlock() return s.state != Stopped @@ -400,7 +439,7 @@ func (s *Server) Running() bool { // Sets the current term for the server. This is only used when an external // current term is found. -func (s *Server) setCurrentTerm(term uint64, leaderName string, append bool) { +func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) { s.mutex.Lock() defer s.mutex.Unlock() @@ -439,7 +478,7 @@ func (s *Server) setCurrentTerm(term uint64, leaderName string, append bool) { // | new leader | | // |_______________________|____________________________________ | // The main event loop for the server -func (s *Server) loop() { +func (s *server) loop() { defer s.debugln("server.loop.end") for { @@ -467,13 +506,13 @@ func (s *Server) loop() { // Sends an event to the event loop to be processed. The function will wait // until the event is actually processed before returning. -func (s *Server) send(value interface{}) (interface{}, error) { +func (s *server) send(value interface{}) (interface{}, error) { event := s.sendAsync(value) err := <-event.c return event.returnValue, err } -func (s *Server) sendAsync(value interface{}) *event { +func (s *server) sendAsync(value interface{}) *event { event := &event{target: value, c: make(chan error, 1)} s.c <- event return event @@ -484,7 +523,7 @@ func (s *Server) sendAsync(value interface{}) *event { // Converts to candidate if election timeout elapses without either: // 1.Receiving valid AppendEntries RPC, or // 2.Granting vote to candidate -func (s *Server) followerLoop() { +func (s *server) followerLoop() { s.setState(Follower) timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2) @@ -547,7 +586,7 @@ func (s *Server) followerLoop() { } // The event loop that is run when the server is in a Candidate state. -func (s *Server) candidateLoop() { +func (s *server) candidateLoop() { lastLogIndex, lastLogTerm := s.log.lastInfo() s.leader = "" @@ -630,7 +669,7 @@ func (s *Server) candidateLoop() { } // The event loop that is run when the server is in a Leader state. -func (s *Server) leaderLoop() { +func (s *server) leaderLoop() { s.setState(Leader) s.syncedPeer = make(map[string]bool) logIndex, _ := s.log.lastInfo() @@ -682,7 +721,7 @@ func (s *Server) leaderLoop() { s.syncedPeer = nil } -func (s *Server) snapshotLoop() { +func (s *server) snapshotLoop() { s.setState(Snapshotting) for { @@ -721,12 +760,12 @@ func (s *Server) snapshotLoop() { // Attempts to execute a command and replicate it. The function will return // when the command has been successfully committed or an error has occurred. -func (s *Server) Do(command Command) (interface{}, error) { +func (s *server) Do(command Command) (interface{}, error) { return s.send(command) } // Processes a command. -func (s *Server) processCommand(command Command, e *event) { +func (s *server) processCommand(command Command, e *event) { s.debugln("server.command.process") // Create an entry for the command in the log. @@ -779,14 +818,14 @@ func (s *Server) processCommand(command Command, e *event) { //-------------------------------------- // Appends zero or more log entry from the leader to this server. -func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse { +func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse { ret, _ := s.send(req) resp, _ := ret.(*AppendEntriesResponse) return resp } // Processes the "append entries" request. -func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) { +func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) { s.traceln("server.ae.process") @@ -824,7 +863,7 @@ func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append // Processes the "append entries" response from the peer. This is only // processed when the server is a leader. Responses received during other // states are dropped. -func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) { +func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) { // If we find a higher term then change to a follower and exit. if resp.Term > s.currentTerm { @@ -888,14 +927,14 @@ func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) { // Requests a vote from a server. A vote can be obtained if the vote's term is // at the server's current term and the server has not made a vote yet. A vote // can also be obtained if the term is greater than the server's current term. -func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse { +func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse { ret, _ := s.send(req) resp, _ := ret.(*RequestVoteResponse) return resp } // Processes a "request vote" request. -func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) { +func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) { // If the request is coming from an old term then reject it. if req.Term < s.currentTerm { @@ -933,7 +972,7 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot //-------------------------------------- // Adds a peer to the server. -func (s *Server) AddPeer(name string, connectiongString string) error { +func (s *server) AddPeer(name string, connectiongString string) error { s.debugln("server.peer.add: ", name, len(s.peers)) // Do not allow peers to be added twice. @@ -959,7 +998,7 @@ func (s *Server) AddPeer(name string, connectiongString string) error { } // Removes a peer from the server. -func (s *Server) RemovePeer(name string) error { +func (s *server) RemovePeer(name string) error { s.debugln("server.peer.remove: ", name, len(s.peers)) // Skip the Peer if it has the same name as the Server @@ -988,7 +1027,7 @@ func (s *Server) RemovePeer(name string) error { // Log compaction //-------------------------------------- -func (s *Server) TakeSnapshot() error { +func (s *server) TakeSnapshot() error { //TODO put a snapshot mutex s.debugln("take Snapshot") if s.currentSnapshot != nil { @@ -1047,7 +1086,7 @@ func (s *Server) TakeSnapshot() error { } // Retrieves the log path for the server. -func (s *Server) saveSnapshot() error { +func (s *server) saveSnapshot() error { if s.currentSnapshot == nil { return errors.New("no snapshot to save") @@ -1071,17 +1110,17 @@ func (s *Server) saveSnapshot() error { } // Retrieves the log path for the server. -func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string { +func (s *server) SnapshotPath(lastIndex uint64, lastTerm uint64) string { return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex)) } -func (s *Server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse { +func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse { ret, _ := s.send(req) resp, _ := ret.(*SnapshotResponse) return resp } -func (s *Server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse { +func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse { // If the follower’s log contains an entry at the snapshot’s last index with a term // that matches the snapshot’s last term @@ -1099,13 +1138,13 @@ func (s *Server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse return newSnapshotResponse(true) } -func (s *Server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse { +func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse { ret, _ := s.send(req) resp, _ := ret.(*SnapshotRecoveryResponse) return resp } -func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse { +func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse { s.stateMachine.Recovery(req.State) @@ -1136,7 +1175,7 @@ func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *S } // Load a snapshot at restart -func (s *Server) LoadSnapshot() error { +func (s *server) LoadSnapshot() error { dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0) if err != nil { @@ -1221,7 +1260,7 @@ func (s *Server) LoadSnapshot() error { // Config File //-------------------------------------- -func (s *Server) writeConf() { +func (s *server) writeConf() { peers := make([]*Peer, len(s.peers)) @@ -1251,7 +1290,7 @@ func (s *Server) writeConf() { } // Read the configuration for the server. -func (s *Server) readConf() error { +func (s *server) readConf() error { confPath := path.Join(s.path, "conf") s.debugln("readConf.open ", confPath) @@ -1277,10 +1316,10 @@ func (s *Server) readConf() error { // Debugging //-------------------------------------- -func (s *Server) debugln(v ...interface{}) { +func (s *server) debugln(v ...interface{}) { debugf("[%s Term:%d] %s", s.name, s.currentTerm, fmt.Sprintln(v...)) } -func (s *Server) traceln(v ...interface{}) { +func (s *server) traceln(v ...interface{}) { tracef("[%s] %s", s.name, fmt.Sprintln(v...)) } diff --git a/third_party/github.com/coreos/go-raft/server_test.go b/third_party/github.com/coreos/go-raft/server_test.go index 01bc96b71..792ed90f3 100644 --- a/third_party/github.com/coreos/go-raft/server_test.go +++ b/third_party/github.com/coreos/go-raft/server_test.go @@ -37,40 +37,40 @@ func TestServerRequestVote(t *testing.T) { // // Ensure that a vote request is denied if it comes from an old term. func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) { - server := newTestServer("1", &testTransporter{}) + s := newTestServer("1", &testTransporter{}) - server.Start() - if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil { - t.Fatalf("Server %s unable to join: %v", server.Name(), err) + s.Start() + if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil { + t.Fatalf("Server %s unable to join: %v", s.Name(), err) } - server.currentTerm = 2 - defer server.Stop() - resp := server.RequestVote(newRequestVoteRequest(1, "foo", 1, 0)) + s.(*server).currentTerm = 2 + defer s.Stop() + resp := s.RequestVote(newRequestVoteRequest(1, "foo", 1, 0)) if resp.Term != 2 || resp.VoteGranted { t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted) } - if server.currentTerm != 2 && server.State() != Follower { - t.Fatalf("Server did not update term and demote: %v / %v", server.currentTerm, server.State()) + if s.Term() != 2 && s.State() != Follower { + t.Fatalf("Server did not update term and demote: %v / %v", s.Term(), s.State()) } } // Ensure that a vote request is denied if we've already voted for a different candidate. func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) { - server := newTestServer("1", &testTransporter{}) + s := newTestServer("1", &testTransporter{}) - server.Start() - if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil { - t.Fatalf("Server %s unable to join: %v", server.Name(), err) + s.Start() + if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil { + t.Fatalf("Server %s unable to join: %v", s.Name(), err) } - server.currentTerm = 2 - defer server.Stop() - resp := server.RequestVote(newRequestVoteRequest(2, "foo", 1, 0)) + s.(*server).currentTerm = 2 + defer s.Stop() + resp := s.RequestVote(newRequestVoteRequest(2, "foo", 1, 0)) if resp.Term != 2 || !resp.VoteGranted { t.Fatalf("First vote should not have been denied") } - resp = server.RequestVote(newRequestVoteRequest(2, "bar", 1, 0)) + resp = s.RequestVote(newRequestVoteRequest(2, "bar", 1, 0)) if resp.Term != 2 || resp.VoteGranted { t.Fatalf("Second vote should have been denied") } @@ -78,24 +78,24 @@ func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) { // Ensure that a vote request is approved if vote occurs in a new term. func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) { - server := newTestServer("1", &testTransporter{}) + s := newTestServer("1", &testTransporter{}) - server.Start() - if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil { - t.Fatalf("Server %s unable to join: %v", server.Name(), err) + s.Start() + if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil { + t.Fatalf("Server %s unable to join: %v", s.Name(), err) } time.Sleep(time.Millisecond * 100) - server.currentTerm = 2 - defer server.Stop() - resp := server.RequestVote(newRequestVoteRequest(2, "foo", 2, 1)) - if resp.Term != 2 || !resp.VoteGranted || server.VotedFor() != "foo" { + s.(*server).currentTerm = 2 + defer s.Stop() + resp := s.RequestVote(newRequestVoteRequest(2, "foo", 2, 1)) + if resp.Term != 2 || !resp.VoteGranted || s.VotedFor() != "foo" { t.Fatalf("First vote should not have been denied") } - resp = server.RequestVote(newRequestVoteRequest(3, "bar", 2, 1)) + resp = s.RequestVote(newRequestVoteRequest(3, "bar", 2, 1)) - if resp.Term != 3 || !resp.VoteGranted || server.VotedFor() != "bar" { + if resp.Term != 3 || !resp.VoteGranted || s.VotedFor() != "bar" { t.Fatalf("Second vote should have been approved") } } @@ -106,33 +106,32 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20}) e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100}) e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0}) - server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2}) + s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2}) // start as a follower with term 2 and index 3 - server.Start() - - defer server.Stop() + s.Start() + defer s.Stop() // request vote from term 3 with last log entry 2, 2 - resp := server.RequestVote(newRequestVoteRequest(3, "foo", 2, 2)) + resp := s.RequestVote(newRequestVoteRequest(3, "foo", 2, 2)) if resp.Term != 3 || resp.VoteGranted { t.Fatalf("Stale index vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted) } // request vote from term 2 with last log entry 2, 3 - resp = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 2)) + resp = s.RequestVote(newRequestVoteRequest(2, "foo", 3, 2)) if resp.Term != 3 || resp.VoteGranted { t.Fatalf("Stale term vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted) } // request vote from term 3 with last log entry 2, 3 - resp = server.RequestVote(newRequestVoteRequest(3, "foo", 3, 2)) + resp = s.RequestVote(newRequestVoteRequest(3, "foo", 3, 2)) if resp.Term != 3 || !resp.VoteGranted { t.Fatalf("Matching log vote should have been granted") } // request vote from term 3 with last log entry 2, 4 - resp = server.RequestVote(newRequestVoteRequest(3, "foo", 4, 2)) + resp = s.RequestVote(newRequestVoteRequest(3, "foo", 4, 2)) if resp.Term != 3 || !resp.VoteGranted { t.Fatalf("Ahead-of-log vote should have been granted") } @@ -145,28 +144,27 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { // // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader. func TestServerPromoteSelf(t *testing.T) { e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20}) - server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0}) + s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0}) // start as a follower - server.Start() - - defer server.Stop() + s.Start() + defer s.Stop() time.Sleep(2 * testElectionTimeout) - if server.State() != Leader { - t.Fatalf("Server self-promotion failed: %v", server.State()) + if s.State() != Leader { + t.Fatalf("Server self-promotion failed: %v", s.State()) } } //Ensure that we can promote a server within a cluster to a leader. func TestServerPromote(t *testing.T) { - lookup := map[string]*Server{} + lookup := map[string]Server{} transporter := &testTransporter{} - transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { + transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { return lookup[peer.Name].RequestVote(req) } - transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { + transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { return lookup[peer.Name].AppendEntries(req) } servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup) @@ -180,8 +178,8 @@ func TestServerPromote(t *testing.T) { if servers[0].State() != Leader && servers[1].State() != Leader && servers[2].State() != Leader { t.Fatalf("No leader elected: (%s, %s, %s)", servers[0].State(), servers[1].State(), servers[2].State()) } - for _, server := range servers { - server.Stop() + for _, s := range servers { + s.Stop() } } @@ -191,20 +189,20 @@ func TestServerPromote(t *testing.T) { // Ensure we can append entries to a server. func TestServerAppendEntries(t *testing.T) { - server := newTestServer("1", &testTransporter{}) + s := newTestServer("1", &testTransporter{}) - server.SetHeartbeatTimeout(time.Second * 10) - server.Start() - defer server.Stop() + s.SetHeartbeatTimeout(time.Second * 10) + s.Start() + defer s.Stop() // Append single entry. e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) entries := []*LogEntry{e} - resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries)) + resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries)) if resp.Term != 1 || !resp.Success { t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success) } - if index, term := server.log.commitInfo(); index != 0 || term != 0 { + if index, term := s.(*server).log.commitInfo(); index != 0 || term != 0 { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } @@ -212,57 +210,56 @@ func TestServerAppendEntries(t *testing.T) { e1, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20}) e2, _ := newLogEntry(nil, 3, 1, &testCommand1{Val: "baz", I: 30}) entries = []*LogEntry{e1, e2} - resp = server.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries)) + resp = s.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries)) if resp.Term != 1 || !resp.Success { t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success) } - if index, term := server.log.commitInfo(); index != 1 || term != 1 { + if index, term := s.(*server).log.commitInfo(); index != 1 || term != 1 { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } // Send zero entries and commit everything. - resp = server.AppendEntries(newAppendEntriesRequest(2, 3, 1, 3, "ldr", []*LogEntry{})) + resp = s.AppendEntries(newAppendEntriesRequest(2, 3, 1, 3, "ldr", []*LogEntry{})) if resp.Term != 2 || !resp.Success { t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success) } - if index, term := server.log.commitInfo(); index != 3 || term != 1 { + if index, term := s.(*server).log.commitInfo(); index != 3 || term != 1 { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } } //Ensure that entries with stale terms are rejected. func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) { - server := newTestServer("1", &testTransporter{}) + s := newTestServer("1", &testTransporter{}) - server.Start() + s.Start() - defer server.Stop() - server.currentTerm = 2 + defer s.Stop() + s.(*server).currentTerm = 2 // Append single entry. e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) entries := []*LogEntry{e} - resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries)) + resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries)) if resp.Term != 2 || resp.Success { t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success) } - if index, term := server.log.commitInfo(); index != 0 || term != 0 { + if index, term := s.(*server).log.commitInfo(); index != 0 || term != 0 { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } } // Ensure that we reject entries if the commit log is different. func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) { - server := newTestServer("1", &testTransporter{}) - server.Start() - - defer server.Stop() + s := newTestServer("1", &testTransporter{}) + s.Start() + defer s.Stop() // Append single entry + commit. e1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) e2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15}) entries := []*LogEntry{e1, e2} - resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries)) + resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries)) if resp.Term != 1 || !resp.Success { t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success) } @@ -270,7 +267,7 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) { // Append entry again (post-commit). e, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20}) entries = []*LogEntry{e} - resp = server.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries)) + resp = s.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries)) if resp.Term != 1 || resp.Success { t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success) } @@ -278,9 +275,9 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) { // Ensure that we uncommitted entries are rolled back if new entries overwrite them. func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) { - server := newTestServer("1", &testTransporter{}) - server.Start() - defer server.Stop() + s := newTestServer("1", &testTransporter{}) + s.Start() + defer s.Stop() entry1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) entry2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15}) @@ -288,15 +285,15 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) { // Append single entry + commit. entries := []*LogEntry{entry1, entry2} - resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 1, "ldr", entries)) - if resp.Term != 1 || !resp.Success || server.log.commitIndex != 1 || !reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry2}) { + resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 1, "ldr", entries)) + if resp.Term != 1 || !resp.Success || s.(*server).log.commitIndex != 1 || !reflect.DeepEqual(s.(*server).log.entries, []*LogEntry{entry1, entry2}) { t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success) } // Append entry that overwrites the second (uncommitted) entry. entries = []*LogEntry{entry3} - resp = server.AppendEntries(newAppendEntriesRequest(2, 1, 1, 2, "ldr", entries)) - if resp.Term != 2 || !resp.Success || server.log.commitIndex != 2 || !reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry3}) { + resp = s.AppendEntries(newAppendEntriesRequest(2, 1, 1, 2, "ldr", entries)) + if resp.Term != 2 || !resp.Success || s.(*server).log.commitIndex != 2 || !reflect.DeepEqual(s.(*server).log.entries, []*LogEntry{entry1, entry3}) { t.Fatalf("AppendEntries should have succeeded: %v/%v", resp.Term, resp.Success) } } @@ -307,11 +304,11 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) { // Ensure that a follower cannot execute a command. func TestServerDenyCommandExecutionWhenFollower(t *testing.T) { - server := newTestServer("1", &testTransporter{}) - server.Start() - defer server.Stop() + s := newTestServer("1", &testTransporter{}) + s.Start() + defer s.Stop() var err error - if _, err = server.Do(&testCommand1{Val: "foo", I: 10}); err != NotLeaderError { + if _, err = s.Do(&testCommand1{Val: "foo", I: 10}); err != NotLeaderError { t.Fatalf("Expected error: %v, got: %v", NotLeaderError, err) } } @@ -324,27 +321,27 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) { func TestServerRecoverFromPreviousLogAndConf(t *testing.T) { // Initialize the servers. var mutex sync.RWMutex - servers := map[string]*Server{} + servers := map[string]Server{} transporter := &testTransporter{} - transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { + transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { mutex.RLock() - s := servers[peer.Name] + target := servers[peer.Name] mutex.RUnlock() - return s.RequestVote(req) + return target.RequestVote(req) } - transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { + transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { mutex.RLock() - s := servers[peer.Name] + target := servers[peer.Name] mutex.RUnlock() - return s.AppendEntries(req) + return target.AppendEntries(req) } disTransporter := &testTransporter{} - disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { + disTransporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { return nil } - disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { + disTransporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { return nil } @@ -358,22 +355,22 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) { names = append(names, strconv.Itoa(i)) } - var leader *Server + var leader Server for _, name := range names { - server := newTestServer(name, transporter) + s := newTestServer(name, transporter) - servers[name] = server - paths[name] = server.Path() + servers[name] = s + paths[name] = s.Path() if name == "1" { - leader = server - server.SetHeartbeatTimeout(testHeartbeatTimeout) - server.Start() + leader = s + s.SetHeartbeatTimeout(testHeartbeatTimeout) + s.Start() time.Sleep(testHeartbeatTimeout) } else { - server.SetElectionTimeout(testElectionTimeout) - server.SetHeartbeatTimeout(testHeartbeatTimeout) - server.Start() + s.SetElectionTimeout(testElectionTimeout) + s.SetHeartbeatTimeout(testHeartbeatTimeout) + s.Start() time.Sleep(testHeartbeatTimeout) } if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil { @@ -385,35 +382,35 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) { // commit some commands for i := 0; i < 10; i++ { if _, err := leader.Do(&testCommand2{X: 1}); err != nil { - t.Fatalf("cannot commit command:", err.Error()) + t.Fatalf("cannot commit command: %s", err.Error()) } } time.Sleep(2 * testHeartbeatTimeout) for _, name := range names { - server := servers[name] - if server.CommitIndex() != 16 { - t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16) + s := servers[name] + if s.CommitIndex() != 16 { + t.Fatalf("%s commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 16) } - server.Stop() + s.Stop() } for _, name := range names { // with old path and disable transportation - server := newTestServerWithPath(name, disTransporter, paths[name]) - servers[name] = server + s := newTestServerWithPath(name, disTransporter, paths[name]) + servers[name] = s - server.Start() + s.Start() // should only commit to the last join command - if server.CommitIndex() != 6 { - t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 6) + if s.CommitIndex() != 6 { + t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 6) } // peer conf should be recovered - if len(server.Peers()) != 4 { - t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(server.Peers()), 4) + if len(s.Peers()) != 4 { + t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(s.Peers()), 4) } } @@ -426,11 +423,11 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) { // should commit to the previous index + 1(nop command when new leader elected) for _, name := range names { - server := servers[name] - if server.CommitIndex() != 17 { - t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 17) + s := servers[name] + if s.CommitIndex() != 17 { + t.Fatalf("%s commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 17) } - server.Stop() + s.Stop() } } @@ -440,29 +437,29 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) { // Ensure that we can start a single server and append to its log. func TestServerSingleNode(t *testing.T) { - server := newTestServer("1", &testTransporter{}) - if server.State() != Stopped { - t.Fatalf("Unexpected server state: %v", server.State()) + s := newTestServer("1", &testTransporter{}) + if s.State() != Stopped { + t.Fatalf("Unexpected server state: %v", s.State()) } - server.Start() + s.Start() time.Sleep(testHeartbeatTimeout) // Join the server to itself. - if _, err := server.Do(&DefaultJoinCommand{Name: "1"}); err != nil { + if _, err := s.Do(&DefaultJoinCommand{Name: "1"}); err != nil { t.Fatalf("Unable to join: %v", err) } debugln("finish command") - if server.State() != Leader { - t.Fatalf("Unexpected server state: %v", server.State()) + if s.State() != Leader { + t.Fatalf("Unexpected server state: %v", s.State()) } - server.Stop() + s.Stop() - if server.State() != Stopped { - t.Fatalf("Unexpected server state: %v", server.State()) + if s.State() != Stopped { + t.Fatalf("Unexpected server state: %v", s.State()) } } @@ -470,27 +467,27 @@ func TestServerSingleNode(t *testing.T) { func TestServerMultiNode(t *testing.T) { // Initialize the servers. var mutex sync.RWMutex - servers := map[string]*Server{} + servers := map[string]Server{} transporter := &testTransporter{} - transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { + transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { mutex.RLock() - s := servers[peer.Name] + target := servers[peer.Name] mutex.RUnlock() - return s.RequestVote(req) + return target.RequestVote(req) } - transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { + transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { mutex.RLock() - s := servers[peer.Name] + target := servers[peer.Name] mutex.RUnlock() - return s.AppendEntries(req) + return target.AppendEntries(req) } disTransporter := &testTransporter{} - disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { + disTransporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { return nil } - disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { + disTransporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { return nil } @@ -503,24 +500,24 @@ func TestServerMultiNode(t *testing.T) { names = append(names, strconv.Itoa(i)) } - var leader *Server + var leader Server for _, name := range names { - server := newTestServer(name, transporter) - defer server.Stop() + s := newTestServer(name, transporter) + defer s.Stop() mutex.Lock() - servers[name] = server + servers[name] = s mutex.Unlock() if name == "1" { - leader = server - server.SetHeartbeatTimeout(testHeartbeatTimeout) - server.Start() + leader = s + s.SetHeartbeatTimeout(testHeartbeatTimeout) + s.Start() time.Sleep(testHeartbeatTimeout) } else { - server.SetElectionTimeout(testElectionTimeout) - server.SetHeartbeatTimeout(testHeartbeatTimeout) - server.Start() + s.SetElectionTimeout(testElectionTimeout) + s.SetHeartbeatTimeout(testHeartbeatTimeout) + s.Start() time.Sleep(testHeartbeatTimeout) } if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil { @@ -536,7 +533,7 @@ func TestServerMultiNode(t *testing.T) { t.Fatalf("Expected member count to be %v, got %v", n, leader.MemberCount()) } if servers["2"].State() == Leader || servers["3"].State() == Leader { - t.Fatalf("Expected leader should be 1: 2=%v, 3=%v\n", servers["2"].state, servers["3"].state) + t.Fatalf("Expected leader should be 1: 2=%v, 3=%v\n", servers["2"].State(), servers["3"].State()) } mutex.RUnlock() @@ -573,7 +570,7 @@ func TestServerMultiNode(t *testing.T) { } debugln("[Test] Done") } - debugln("Leader is ", value.Name(), " Index ", value.log.commitIndex) + debugln("Leader is ", value.Name(), " Index ", value.(*server).log.commitIndex) } debugln("Not Found leader") } @@ -584,7 +581,7 @@ func TestServerMultiNode(t *testing.T) { if value.State() == Leader { leader++ } - debugln(value.Name(), " ", value.currentTerm, " ", value.state) + debugln(value.Name(), " ", value.(*server).Term(), " ", value.State()) } } diff --git a/third_party/github.com/coreos/go-raft/snapshot.go b/third_party/github.com/coreos/go-raft/snapshot.go index 93b1a97cd..4f416f741 100644 --- a/third_party/github.com/coreos/go-raft/snapshot.go +++ b/third_party/github.com/coreos/go-raft/snapshot.go @@ -20,9 +20,9 @@ type Snapshot struct { LastIndex uint64 `json:"lastIndex"` LastTerm uint64 `json:"lastTerm"` // cluster configuration. - Peers []*Peer `json: "peers"` - State []byte `json: "state"` - Path string `json: "path"` + Peers []*Peer `json:"peers"` + State []byte `json:"state"` + Path string `json:"path"` } // Save the snapshot to a file diff --git a/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go b/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go index a05f43108..275ff41f3 100644 --- a/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go +++ b/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go @@ -80,7 +80,7 @@ func (req *SnapshotRecoveryRequest) Decode(r io.Reader) (int, error) { req.LeaderName = pb.GetLeaderName() req.LastIndex = pb.GetLastIndex() req.LastTerm = pb.GetLastTerm() - req.State = req.State + req.State = pb.GetState() req.Peers = make([]*Peer, len(pb.Peers)) diff --git a/third_party/github.com/coreos/go-raft/test.go b/third_party/github.com/coreos/go-raft/test.go index 95a6c7168..5b323f749 100644 --- a/third_party/github.com/coreos/go-raft/test.go +++ b/third_party/github.com/coreos/go-raft/test.go @@ -60,7 +60,7 @@ func setupLog(entries []*LogEntry) (*Log, string) { // Servers //-------------------------------------- -func newTestServer(name string, transporter Transporter) *Server { +func newTestServer(name string, transporter Transporter) Server { p, _ := ioutil.TempDir("", "raft-server-") if err := os.MkdirAll(p, 0644); err != nil { panic(err.Error()) @@ -69,12 +69,12 @@ func newTestServer(name string, transporter Transporter) *Server { return server } -func newTestServerWithPath(name string, transporter Transporter, p string) *Server { +func newTestServerWithPath(name string, transporter Transporter, p string) Server { server, _ := NewServer(name, p, transporter, nil, nil, "") return server } -func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) *Server { +func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) Server { server := newTestServer(name, transporter) f, err := os.Create(server.LogPath()) if err != nil { @@ -88,8 +88,8 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn return server } -func newTestCluster(names []string, transporter Transporter, lookup map[string]*Server) []*Server { - servers := []*Server{} +func newTestCluster(names []string, transporter Transporter, lookup map[string]Server) []Server { + servers := []Server{} e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20}) for _, name := range names { @@ -116,24 +116,24 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]* //-------------------------------------- type testTransporter struct { - sendVoteRequestFunc func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse - sendAppendEntriesRequestFunc func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse - sendSnapshotRequestFunc func(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse + sendVoteRequestFunc func(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse + sendAppendEntriesRequestFunc func(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse + sendSnapshotRequestFunc func(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse } -func (t *testTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { +func (t *testTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { return t.sendVoteRequestFunc(server, peer, req) } -func (t *testTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { +func (t *testTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { return t.sendAppendEntriesRequestFunc(server, peer, req) } -func (t *testTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse { +func (t *testTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse { return t.sendSnapshotRequestFunc(server, peer, req) } -func (t *testTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse { +func (t *testTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse { return t.SendSnapshotRecoveryRequest(server, peer, req) } @@ -163,7 +163,7 @@ func (c *testCommand1) CommandName() string { return "cmd_1" } -func (c *testCommand1) Apply(server *Server) (interface{}, error) { +func (c *testCommand1) Apply(server Server) (interface{}, error) { return nil, nil } @@ -179,6 +179,6 @@ func (c *testCommand2) CommandName() string { return "cmd_2" } -func (c *testCommand2) Apply(server *Server) (interface{}, error) { +func (c *testCommand2) Apply(server Server) (interface{}, error) { return nil, nil } diff --git a/third_party/github.com/coreos/go-raft/transporter.go b/third_party/github.com/coreos/go-raft/transporter.go index f7d51e527..faf95edd7 100644 --- a/third_party/github.com/coreos/go-raft/transporter.go +++ b/third_party/github.com/coreos/go-raft/transporter.go @@ -9,8 +9,8 @@ package raft // Transporter is the interface for allowing the host application to transport // requests to other nodes. type Transporter interface { - SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse - SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse - SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse - SendSnapshotRecoveryRequest(server *Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse + SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse + SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse + SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse + SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse } diff --git a/web/web.go b/web/web.go index 1ce9d3fe5..723eb05c8 100644 --- a/web/web.go +++ b/web/web.go @@ -23,7 +23,7 @@ func mainHandler(c http.ResponseWriter, req *http.Request) { mainTempl.Execute(c, p) } -func Start(raftServer *raft.Server, webURL string) { +func Start(raftServer raft.Server, webURL string) { u, _ := url.Parse(webURL) webMux := http.NewServeMux()