mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
refactor command.go,server.go: add raftWrapper as context, totally get rid of reference in command.go
This commit is contained in:
parent
61899d62c5
commit
255e14a5c4
12
command.go
12
command.go
@ -192,12 +192,12 @@ type JoinCommand struct {
|
||||
EtcdURL string `json:"etcdURL"`
|
||||
}
|
||||
|
||||
func newJoinCommand() *JoinCommand {
|
||||
func newJoinCommand(version, name, raftUrl, etcdUrl string) *JoinCommand {
|
||||
return &JoinCommand{
|
||||
RaftVersion: r.version,
|
||||
Name: r.name,
|
||||
RaftURL: r.url,
|
||||
EtcdURL: e.url,
|
||||
RaftVersion: version,
|
||||
Name: name,
|
||||
RaftURL: raftUrl,
|
||||
EtcdURL: etcdUrl,
|
||||
}
|
||||
}
|
||||
|
||||
@ -209,6 +209,7 @@ func (c *JoinCommand) CommandName() string {
|
||||
// Join a server to the cluster
|
||||
func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
|
||||
s, _ := server.StateMachine().(*store.Store)
|
||||
r, _ := server.Context().(*raftServer)
|
||||
|
||||
// check if the join command is from a previous machine, who lost all its previous log.
|
||||
e, _ := s.Get(path.Join("/_etcd/machines", c.Name), false, false, server.CommitIndex(), server.Term())
|
||||
@ -263,6 +264,7 @@ func (c *RemoveCommand) CommandName() string {
|
||||
// Remove a server from the cluster
|
||||
func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) {
|
||||
s, _ := server.StateMachine().(*store.Store)
|
||||
r, _ := server.Context().(*raftServer)
|
||||
|
||||
// remove machine in etcd storage
|
||||
key := path.Join("_etcd/machines", c.Name)
|
||||
|
@ -35,13 +35,7 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
|
||||
// Create transporter for raft
|
||||
raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client)
|
||||
|
||||
// Create raft server
|
||||
server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil, "")
|
||||
|
||||
check(err)
|
||||
|
||||
return &raftServer{
|
||||
Server: server,
|
||||
raftWrapper := &raftServer{
|
||||
version: raftVersion,
|
||||
name: name,
|
||||
url: url,
|
||||
@ -62,6 +56,14 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Create raft server
|
||||
server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, raftWrapper, "")
|
||||
check(err)
|
||||
|
||||
raftWrapper.Server = server
|
||||
|
||||
return raftWrapper
|
||||
}
|
||||
|
||||
// Start the raft server
|
||||
@ -127,7 +129,7 @@ func (r *raftServer) ListenAndServe() {
|
||||
func startAsLeader() {
|
||||
// leader need to join self as a peer
|
||||
for {
|
||||
_, err := r.Do(newJoinCommand())
|
||||
_, err := r.Do(newJoinCommand(r.version, r.Name(), r.url, e.url))
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
@ -243,7 +245,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error {
|
||||
return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd")
|
||||
}
|
||||
|
||||
json.NewEncoder(&b).Encode(newJoinCommand())
|
||||
json.NewEncoder(&b).Encode(newJoinCommand(r.version, r.Name(), r.url, e.url))
|
||||
|
||||
joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"}
|
||||
|
||||
@ -270,7 +272,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error {
|
||||
address := resp.Header.Get("Location")
|
||||
debugf("Send Join Request to %s", address)
|
||||
|
||||
json.NewEncoder(&b).Encode(newJoinCommand())
|
||||
json.NewEncoder(&b).Encode(newJoinCommand(r.version, r.Name(), r.url, e.url))
|
||||
|
||||
resp, req, err = t.Post(address, &b)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user