From 255e14a5c416b06a84f6a4dd76f189e867ba01ea Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 9 Oct 2013 20:51:21 -0700 Subject: [PATCH] refactor command.go,server.go: add raftWrapper as context, totally get rid of reference in command.go --- command.go | 12 +++++++----- raft_server.go | 22 ++++++++++++---------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/command.go b/command.go index 79affd7d9..0a330c232 100644 --- a/command.go +++ b/command.go @@ -192,12 +192,12 @@ type JoinCommand struct { EtcdURL string `json:"etcdURL"` } -func newJoinCommand() *JoinCommand { +func newJoinCommand(version, name, raftUrl, etcdUrl string) *JoinCommand { return &JoinCommand{ - RaftVersion: r.version, - Name: r.name, - RaftURL: r.url, - EtcdURL: e.url, + RaftVersion: version, + Name: name, + RaftURL: raftUrl, + EtcdURL: etcdUrl, } } @@ -209,6 +209,7 @@ func (c *JoinCommand) CommandName() string { // Join a server to the cluster func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) { s, _ := server.StateMachine().(*store.Store) + r, _ := server.Context().(*raftServer) // check if the join command is from a previous machine, who lost all its previous log. e, _ := s.Get(path.Join("/_etcd/machines", c.Name), false, false, server.CommitIndex(), server.Term()) @@ -263,6 +264,7 @@ func (c *RemoveCommand) CommandName() string { // Remove a server from the cluster func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) { s, _ := server.StateMachine().(*store.Store) + r, _ := server.Context().(*raftServer) // remove machine in etcd storage key := path.Join("_etcd/machines", c.Name) diff --git a/raft_server.go b/raft_server.go index efbf41aca..8e48c027e 100644 --- a/raft_server.go +++ b/raft_server.go @@ -35,13 +35,7 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi // Create transporter for raft raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client) - // Create raft server - server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil, "") - - check(err) - - return &raftServer{ - Server: server, + raftWrapper := &raftServer{ version: raftVersion, name: name, url: url, @@ -62,6 +56,14 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi }, }, } + + // Create raft server + server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, raftWrapper, "") + check(err) + + raftWrapper.Server = server + + return raftWrapper } // Start the raft server @@ -127,7 +129,7 @@ func (r *raftServer) ListenAndServe() { func startAsLeader() { // leader need to join self as a peer for { - _, err := r.Do(newJoinCommand()) + _, err := r.Do(newJoinCommand(r.version, r.Name(), r.url, e.url)) if err == nil { break } @@ -243,7 +245,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error { return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd") } - json.NewEncoder(&b).Encode(newJoinCommand()) + json.NewEncoder(&b).Encode(newJoinCommand(r.version, r.Name(), r.url, e.url)) joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"} @@ -270,7 +272,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error { address := resp.Header.Get("Location") debugf("Send Join Request to %s", address) - json.NewEncoder(&b).Encode(newJoinCommand()) + json.NewEncoder(&b).Encode(newJoinCommand(r.version, r.Name(), r.url, e.url)) resp, req, err = t.Post(address, &b)